mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-13 09:04:06 -08:00
Merge pull request #3297 from prometheus/grobie/update-tsdb
Update prometheus/tsdb dependency
This commit is contained in:
commit
b5851a49e5
|
@ -140,7 +140,11 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a adapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
|
func (a adapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||||
return querier{q: a.db.Querier(mint, maxt)}, nil
|
q, err := a.db.Querier(mint, maxt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return querier{q: q}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Appender returns a new appender against the storage.
|
// Appender returns a new appender against the storage.
|
||||||
|
|
143
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
143
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
|
@ -19,6 +19,7 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -26,33 +27,16 @@ import (
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DiskBlock represents a data block backed by on-disk data.
|
|
||||||
type DiskBlock interface {
|
|
||||||
BlockReader
|
|
||||||
|
|
||||||
// Directory where block data is stored.
|
|
||||||
Dir() string
|
|
||||||
|
|
||||||
// Stats returns statistics about the block.
|
|
||||||
Meta() BlockMeta
|
|
||||||
|
|
||||||
Delete(mint, maxt int64, m ...labels.Matcher) error
|
|
||||||
|
|
||||||
Snapshot(dir string) error
|
|
||||||
|
|
||||||
Close() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// BlockReader provides reading access to a data block.
|
// BlockReader provides reading access to a data block.
|
||||||
type BlockReader interface {
|
type BlockReader interface {
|
||||||
// Index returns an IndexReader over the block's data.
|
// Index returns an IndexReader over the block's data.
|
||||||
Index() IndexReader
|
Index() (IndexReader, error)
|
||||||
|
|
||||||
// Chunks returns a ChunkReader over the block's data.
|
// Chunks returns a ChunkReader over the block's data.
|
||||||
Chunks() ChunkReader
|
Chunks() (ChunkReader, error)
|
||||||
|
|
||||||
// Tombstones returns a TombstoneReader over the block's deleted data.
|
// Tombstones returns a TombstoneReader over the block's deleted data.
|
||||||
Tombstones() TombstoneReader
|
Tombstones() (TombstoneReader, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Appendable defines an entity to which data can be appended.
|
// Appendable defines an entity to which data can be appended.
|
||||||
|
@ -149,7 +133,12 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
|
||||||
return renameFile(tmp, path)
|
return renameFile(tmp, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
type persistedBlock struct {
|
// Block represents a directory of time series data covering a continous time range.
|
||||||
|
type Block struct {
|
||||||
|
mtx sync.RWMutex
|
||||||
|
closing bool
|
||||||
|
pendingReaders sync.WaitGroup
|
||||||
|
|
||||||
dir string
|
dir string
|
||||||
meta BlockMeta
|
meta BlockMeta
|
||||||
|
|
||||||
|
@ -159,7 +148,9 @@ type persistedBlock struct {
|
||||||
tombstones tombstoneReader
|
tombstones tombstoneReader
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
|
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
|
||||||
|
// to instantiate chunk structs.
|
||||||
|
func OpenBlock(dir string, pool chunks.Pool) (*Block, error) {
|
||||||
meta, err := readMetaFile(dir)
|
meta, err := readMetaFile(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -179,7 +170,7 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
pb := &persistedBlock{
|
pb := &Block{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
meta: *meta,
|
meta: *meta,
|
||||||
chunkr: cr,
|
chunkr: cr,
|
||||||
|
@ -189,28 +180,110 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
|
||||||
return pb, nil
|
return pb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) Close() error {
|
// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
|
||||||
|
func (pb *Block) Close() error {
|
||||||
|
pb.mtx.Lock()
|
||||||
|
pb.closing = true
|
||||||
|
pb.mtx.Unlock()
|
||||||
|
|
||||||
|
pb.pendingReaders.Wait()
|
||||||
|
|
||||||
var merr MultiError
|
var merr MultiError
|
||||||
|
|
||||||
merr.Add(pb.chunkr.Close())
|
merr.Add(pb.chunkr.Close())
|
||||||
merr.Add(pb.indexr.Close())
|
merr.Add(pb.indexr.Close())
|
||||||
|
merr.Add(pb.tombstones.Close())
|
||||||
|
|
||||||
return merr.Err()
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) String() string {
|
func (pb *Block) String() string {
|
||||||
return pb.meta.ULID.String()
|
return pb.meta.ULID.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) Dir() string { return pb.dir }
|
// Dir returns the directory of the block.
|
||||||
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
|
func (pb *Block) Dir() string { return pb.dir }
|
||||||
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
|
|
||||||
func (pb *persistedBlock) Tombstones() TombstoneReader {
|
// Meta returns meta information about the block.
|
||||||
return pb.tombstones
|
func (pb *Block) Meta() BlockMeta { return pb.meta }
|
||||||
}
|
|
||||||
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
|
// ErrClosing is returned when a block is in the process of being closed.
|
||||||
|
var ErrClosing = errors.New("block is closing")
|
||||||
|
|
||||||
|
func (pb *Block) startRead() error {
|
||||||
|
pb.mtx.RLock()
|
||||||
|
defer pb.mtx.RUnlock()
|
||||||
|
|
||||||
|
if pb.closing {
|
||||||
|
return ErrClosing
|
||||||
|
}
|
||||||
|
pb.pendingReaders.Add(1)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index returns a new IndexReader against the block data.
|
||||||
|
func (pb *Block) Index() (IndexReader, error) {
|
||||||
|
if err := pb.startRead(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return blockIndexReader{IndexReader: pb.indexr, b: pb}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Chunks returns a new ChunkReader against the block data.
|
||||||
|
func (pb *Block) Chunks() (ChunkReader, error) {
|
||||||
|
if err := pb.startRead(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return blockChunkReader{ChunkReader: pb.chunkr, b: pb}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tombstones returns a new TombstoneReader against the block data.
|
||||||
|
func (pb *Block) Tombstones() (TombstoneReader, error) {
|
||||||
|
if err := pb.startRead(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type blockIndexReader struct {
|
||||||
|
IndexReader
|
||||||
|
b *Block
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r blockIndexReader) Close() error {
|
||||||
|
r.b.pendingReaders.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type blockTombstoneReader struct {
|
||||||
|
TombstoneReader
|
||||||
|
b *Block
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r blockTombstoneReader) Close() error {
|
||||||
|
r.b.pendingReaders.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type blockChunkReader struct {
|
||||||
|
ChunkReader
|
||||||
|
b *Block
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r blockChunkReader) Close() error {
|
||||||
|
r.b.pendingReaders.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete matching series between mint and maxt in the block.
|
||||||
|
func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
|
pb.mtx.Lock()
|
||||||
|
defer pb.mtx.Unlock()
|
||||||
|
|
||||||
|
if pb.closing {
|
||||||
|
return ErrClosing
|
||||||
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
|
||||||
pr := newPostingsReader(pb.indexr)
|
pr := newPostingsReader(pb.indexr)
|
||||||
p, absent := pr.Select(ms...)
|
p, absent := pr.Select(ms...)
|
||||||
|
|
||||||
|
@ -262,7 +335,8 @@ Outer:
|
||||||
return writeMetaFile(pb.dir, &pb.meta)
|
return writeMetaFile(pb.dir, &pb.meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) Snapshot(dir string) error {
|
// Snapshot creates snapshot of the block into dir.
|
||||||
|
func (pb *Block) Snapshot(dir string) error {
|
||||||
blockDir := filepath.Join(dir, pb.meta.ULID.String())
|
blockDir := filepath.Join(dir, pb.meta.ULID.String())
|
||||||
if err := os.MkdirAll(blockDir, 0777); err != nil {
|
if err := os.MkdirAll(blockDir, 0777); err != nil {
|
||||||
return errors.Wrap(err, "create snapshot block dir")
|
return errors.Wrap(err, "create snapshot block dir")
|
||||||
|
@ -311,7 +385,6 @@ func clampInterval(a, b, mint, maxt int64) (int64, int64) {
|
||||||
if b > maxt {
|
if b > maxt {
|
||||||
b = maxt
|
b = maxt
|
||||||
}
|
}
|
||||||
|
|
||||||
return a, b
|
return a, b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
2
vendor/github.com/prometheus/tsdb/chunks.go
generated
vendored
2
vendor/github.com/prometheus/tsdb/chunks.go
generated
vendored
|
@ -21,9 +21,9 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/prometheus/tsdb/fileutil"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/tsdb/chunks"
|
"github.com/prometheus/tsdb/chunks"
|
||||||
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
32
vendor/github.com/prometheus/tsdb/compact.go
generated
vendored
32
vendor/github.com/prometheus/tsdb/compact.go
generated
vendored
|
@ -14,6 +14,7 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -299,7 +300,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
|
||||||
var metas []*BlockMeta
|
var metas []*BlockMeta
|
||||||
|
|
||||||
for _, d := range dirs {
|
for _, d := range dirs {
|
||||||
b, err := newPersistedBlock(d, c.chunkPool)
|
b, err := OpenBlock(d, c.chunkPool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -444,10 +445,30 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
var (
|
var (
|
||||||
set compactionSet
|
set compactionSet
|
||||||
allSymbols = make(map[string]struct{}, 1<<16)
|
allSymbols = make(map[string]struct{}, 1<<16)
|
||||||
|
closers = []io.Closer{}
|
||||||
)
|
)
|
||||||
for i, b := range blocks {
|
defer func() { closeAll(closers...) }()
|
||||||
|
|
||||||
symbols, err := b.Index().Symbols()
|
for i, b := range blocks {
|
||||||
|
indexr, err := b.Index()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "open index reader for block %s", b)
|
||||||
|
}
|
||||||
|
closers = append(closers, indexr)
|
||||||
|
|
||||||
|
chunkr, err := b.Chunks()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "open chunk reader for block %s", b)
|
||||||
|
}
|
||||||
|
closers = append(closers, chunkr)
|
||||||
|
|
||||||
|
tombsr, err := b.Tombstones()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "open tombstone reader for block %s", b)
|
||||||
|
}
|
||||||
|
closers = append(closers, tombsr)
|
||||||
|
|
||||||
|
symbols, err := indexr.Symbols()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "read symbols")
|
return errors.Wrap(err, "read symbols")
|
||||||
}
|
}
|
||||||
|
@ -455,15 +476,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
allSymbols[s] = struct{}{}
|
allSymbols[s] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
indexr := b.Index()
|
|
||||||
|
|
||||||
all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value)
|
all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
all = indexr.SortedPostings(all)
|
all = indexr.SortedPostings(all)
|
||||||
|
|
||||||
s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all)
|
s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
|
||||||
|
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
set = s
|
set = s
|
||||||
|
@ -565,7 +584,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
return errors.Wrap(err, "write postings")
|
return errors.Wrap(err, "write postings")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
83
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
83
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
|
@ -30,7 +30,6 @@ import (
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/prometheus/tsdb/fileutil"
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
"github.com/nightlyone/lockfile"
|
"github.com/nightlyone/lockfile"
|
||||||
|
@ -38,6 +37,7 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/tsdb/chunks"
|
"github.com/prometheus/tsdb/chunks"
|
||||||
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ type DB struct {
|
||||||
|
|
||||||
// Mutex for that must be held when modifying the general block layout.
|
// Mutex for that must be held when modifying the general block layout.
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
blocks []DiskBlock
|
blocks []*Block
|
||||||
|
|
||||||
head *Head
|
head *Head
|
||||||
|
|
||||||
|
@ -431,7 +431,7 @@ func retentionCutoff(dir string, mint int64) (bool, error) {
|
||||||
return changes, fileutil.Fsync(df)
|
return changes, fileutil.Fsync(df)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) {
|
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.blocks {
|
||||||
if b.Meta().ULID == id {
|
if b.Meta().ULID == id {
|
||||||
return b, true
|
return b, true
|
||||||
|
@ -456,7 +456,7 @@ func (db *DB) reload() (err error) {
|
||||||
return errors.Wrap(err, "find blocks")
|
return errors.Wrap(err, "find blocks")
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
blocks []DiskBlock
|
blocks []*Block
|
||||||
exist = map[ulid.ULID]struct{}{}
|
exist = map[ulid.ULID]struct{}{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -468,7 +468,7 @@ func (db *DB) reload() (err error) {
|
||||||
|
|
||||||
b, ok := db.getBlock(meta.ULID)
|
b, ok := db.getBlock(meta.ULID)
|
||||||
if !ok {
|
if !ok {
|
||||||
b, err = newPersistedBlock(dir, db.chunkPool)
|
b, err = OpenBlock(dir, db.chunkPool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "open block %s", dir)
|
return errors.Wrapf(err, "open block %s", dir)
|
||||||
}
|
}
|
||||||
|
@ -505,7 +505,7 @@ func (db *DB) reload() (err error) {
|
||||||
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
|
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateBlockSequence(bs []DiskBlock) error {
|
func validateBlockSequence(bs []*Block) error {
|
||||||
if len(bs) == 0 {
|
if len(bs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -521,13 +521,19 @@ func validateBlockSequence(bs []DiskBlock) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Blocks() []DiskBlock {
|
func (db *DB) String() string {
|
||||||
|
return "HEAD"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Blocks returns the databases persisted blocks.
|
||||||
|
func (db *DB) Blocks() []*Block {
|
||||||
db.mtx.RLock()
|
db.mtx.RLock()
|
||||||
defer db.mtx.RUnlock()
|
defer db.mtx.RUnlock()
|
||||||
|
|
||||||
return db.blocks
|
return db.blocks
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Head returns the databases's head.
|
||||||
func (db *DB) Head() *Head {
|
func (db *DB) Head() *Head {
|
||||||
return db.head
|
return db.head
|
||||||
}
|
}
|
||||||
|
@ -587,41 +593,42 @@ func (db *DB) Snapshot(dir string) error {
|
||||||
db.cmtx.Lock()
|
db.cmtx.Lock()
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
db.mtx.RLock()
|
for _, b := range db.Blocks() {
|
||||||
defer db.mtx.RUnlock()
|
|
||||||
|
|
||||||
for _, b := range db.blocks {
|
|
||||||
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
|
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
|
||||||
|
|
||||||
if err := b.Snapshot(dir); err != nil {
|
if err := b.Snapshot(dir); err != nil {
|
||||||
return errors.Wrap(err, "error snapshotting headblock")
|
return errors.Wrap(err, "error snapshotting headblock")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
|
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Querier returns a new querier over the data partition for the given time range.
|
// Querier returns a new querier over the data partition for the given time range.
|
||||||
// A goroutine must not handle more than one open Querier.
|
// A goroutine must not handle more than one open Querier.
|
||||||
func (db *DB) Querier(mint, maxt int64) Querier {
|
func (db *DB) Querier(mint, maxt int64) (Querier, error) {
|
||||||
db.mtx.RLock()
|
var blocks []BlockReader
|
||||||
|
|
||||||
blocks := db.blocksForInterval(mint, maxt)
|
for _, b := range db.Blocks() {
|
||||||
|
m := b.Meta()
|
||||||
|
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
|
||||||
|
blocks = append(blocks, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if maxt >= db.head.MinTime() {
|
||||||
|
blocks = append(blocks, db.head)
|
||||||
|
}
|
||||||
|
|
||||||
sq := &querier{
|
sq := &querier{
|
||||||
blocks: make([]Querier, 0, len(blocks)),
|
blocks: make([]Querier, 0, len(blocks)),
|
||||||
db: db,
|
|
||||||
}
|
}
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
sq.blocks = append(sq.blocks, &blockQuerier{
|
q, err := NewBlockQuerier(b, mint, maxt)
|
||||||
mint: mint,
|
if err != nil {
|
||||||
maxt: maxt,
|
return nil, errors.Wrapf(err, "open querier for block %s", b)
|
||||||
index: b.Index(),
|
}
|
||||||
chunks: b.Chunks(),
|
sq.blocks = append(sq.blocks, q)
|
||||||
tombstones: b.Tombstones(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
return sq
|
return sq, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
|
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
|
||||||
|
@ -634,28 +641,22 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
db.cmtx.Lock()
|
db.cmtx.Lock()
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
db.mtx.Lock()
|
|
||||||
defer db.mtx.Unlock()
|
|
||||||
|
|
||||||
var g errgroup.Group
|
var g errgroup.Group
|
||||||
|
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.Blocks() {
|
||||||
m := b.Meta()
|
m := b.Meta()
|
||||||
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
|
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
|
||||||
g.Go(func(b DiskBlock) func() error {
|
g.Go(func(b *Block) func() error {
|
||||||
return func() error { return b.Delete(mint, maxt, ms...) }
|
return func() error { return b.Delete(mint, maxt, ms...) }
|
||||||
}(b))
|
}(b))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
return db.head.Delete(mint, maxt, ms...)
|
return db.head.Delete(mint, maxt, ms...)
|
||||||
})
|
})
|
||||||
|
|
||||||
if err := g.Wait(); err != nil {
|
if err := g.Wait(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -668,24 +669,6 @@ func intervalContains(min, max, t int64) bool {
|
||||||
return t >= min && t <= max
|
return t >= min && t <= max
|
||||||
}
|
}
|
||||||
|
|
||||||
// blocksForInterval returns all blocks within the partition that may contain
|
|
||||||
// data for the given time range.
|
|
||||||
func (db *DB) blocksForInterval(mint, maxt int64) []BlockReader {
|
|
||||||
var bs []BlockReader
|
|
||||||
|
|
||||||
for _, b := range db.blocks {
|
|
||||||
m := b.Meta()
|
|
||||||
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
|
|
||||||
bs = append(bs, b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if maxt >= db.head.MinTime() {
|
|
||||||
bs = append(bs, db.head)
|
|
||||||
}
|
|
||||||
|
|
||||||
return bs
|
|
||||||
}
|
|
||||||
|
|
||||||
func isBlockDir(fi os.FileInfo) bool {
|
func isBlockDir(fi os.FileInfo) bool {
|
||||||
if !fi.IsDir() {
|
if !fi.IsDir() {
|
||||||
return false
|
return false
|
||||||
|
|
190
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
190
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
|
@ -15,6 +15,7 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -73,6 +74,7 @@ type headMetrics struct {
|
||||||
series prometheus.Gauge
|
series prometheus.Gauge
|
||||||
seriesCreated prometheus.Counter
|
seriesCreated prometheus.Counter
|
||||||
seriesRemoved prometheus.Counter
|
seriesRemoved prometheus.Counter
|
||||||
|
seriesNotFound prometheus.Counter
|
||||||
chunks prometheus.Gauge
|
chunks prometheus.Gauge
|
||||||
chunksCreated prometheus.Gauge
|
chunksCreated prometheus.Gauge
|
||||||
chunksRemoved prometheus.Gauge
|
chunksRemoved prometheus.Gauge
|
||||||
|
@ -102,6 +104,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
||||||
Name: "tsdb_head_series_removed_total",
|
Name: "tsdb_head_series_removed_total",
|
||||||
Help: "Total number of series removed in the head",
|
Help: "Total number of series removed in the head",
|
||||||
})
|
})
|
||||||
|
m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "tsdb_head_series_not_found",
|
||||||
|
Help: "Total number of requests for series that were not found.",
|
||||||
|
})
|
||||||
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
|
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
Name: "tsdb_head_chunks",
|
Name: "tsdb_head_chunks",
|
||||||
Help: "Total number of chunks in the head block.",
|
Help: "Total number of chunks in the head block.",
|
||||||
|
@ -118,13 +124,13 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
||||||
Name: "tsdb_head_gc_duration_seconds",
|
Name: "tsdb_head_gc_duration_seconds",
|
||||||
Help: "Runtime of garbage collection in the head block.",
|
Help: "Runtime of garbage collection in the head block.",
|
||||||
})
|
})
|
||||||
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
Name: "tsdb_head_max_time",
|
Name: "tsdb_head_max_time",
|
||||||
Help: "Maximum timestamp of the head block.",
|
Help: "Maximum timestamp of the head block.",
|
||||||
}, func() float64 {
|
}, func() float64 {
|
||||||
return float64(h.MaxTime())
|
return float64(h.MaxTime())
|
||||||
})
|
})
|
||||||
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
Name: "tsdb_head_min_time",
|
Name: "tsdb_head_min_time",
|
||||||
Help: "Minimum time bound of the head block.",
|
Help: "Minimum time bound of the head block.",
|
||||||
}, func() float64 {
|
}, func() float64 {
|
||||||
|
@ -148,6 +154,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
||||||
m.series,
|
m.series,
|
||||||
m.seriesCreated,
|
m.seriesCreated,
|
||||||
m.seriesRemoved,
|
m.seriesRemoved,
|
||||||
|
m.seriesNotFound,
|
||||||
m.minTime,
|
m.minTime,
|
||||||
m.maxTime,
|
m.maxTime,
|
||||||
m.gcDuration,
|
m.gcDuration,
|
||||||
|
@ -178,7 +185,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||||
series: newStripeSeries(),
|
series: newStripeSeries(),
|
||||||
values: map[string]stringset{},
|
values: map[string]stringset{},
|
||||||
symbols: map[string]struct{}{},
|
symbols: map[string]struct{}{},
|
||||||
postings: newMemPostings(),
|
postings: newUnorderedMemPostings(),
|
||||||
tombstones: newEmptyTombstoneReader(),
|
tombstones: newEmptyTombstoneReader(),
|
||||||
}
|
}
|
||||||
h.metrics = newHeadMetrics(h, r)
|
h.metrics = newHeadMetrics(h, r)
|
||||||
|
@ -186,28 +193,19 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadWAL initializes the head by consuming the write ahead log.
|
// processWALSamples adds a partition of samples it receives to the head and passes
|
||||||
func (h *Head) ReadWAL() error {
|
// them on to other workers.
|
||||||
r := h.wal.Reader()
|
// Samples before the mint timestamp are discarded.
|
||||||
mint := h.MinTime()
|
func (h *Head) processWALSamples(
|
||||||
|
mint int64,
|
||||||
|
partition, total uint64,
|
||||||
|
input <-chan []RefSample, output chan<- []RefSample,
|
||||||
|
) (unknownRefs uint64) {
|
||||||
|
defer close(output)
|
||||||
|
|
||||||
// Track number of samples that referenced a series we don't know about
|
for samples := range input {
|
||||||
// for error reporting.
|
|
||||||
var unknownRefs int
|
|
||||||
|
|
||||||
seriesFunc := func(series []RefSeries) error {
|
|
||||||
for _, s := range series {
|
|
||||||
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
|
||||||
|
|
||||||
if h.lastSeriesID < s.Ref {
|
|
||||||
h.lastSeriesID = s.Ref
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
samplesFunc := func(samples []RefSample) error {
|
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
if s.T < mint {
|
if s.T < mint || s.Ref%total != partition {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ms := h.series.getByID(s.Ref)
|
ms := h.series.getByID(s.Ref)
|
||||||
|
@ -221,9 +219,69 @@ func (h *Head) ReadWAL() error {
|
||||||
h.metrics.chunks.Inc()
|
h.metrics.chunks.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
output <- samples
|
||||||
}
|
}
|
||||||
deletesFunc := func(stones []Stone) error {
|
return unknownRefs
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadWAL initializes the head by consuming the write ahead log.
|
||||||
|
func (h *Head) ReadWAL() error {
|
||||||
|
defer h.postings.ensureOrder()
|
||||||
|
|
||||||
|
r := h.wal.Reader()
|
||||||
|
mint := h.MinTime()
|
||||||
|
|
||||||
|
// Track number of samples that referenced a series we don't know about
|
||||||
|
// for error reporting.
|
||||||
|
var unknownRefs uint64
|
||||||
|
|
||||||
|
// Start workers that each process samples for a partition of the series ID space.
|
||||||
|
// They are connected through a ring of channels which ensures that all sample batches
|
||||||
|
// read from the WAL are processed in order.
|
||||||
|
var (
|
||||||
|
wg sync.WaitGroup
|
||||||
|
n = runtime.GOMAXPROCS(0)
|
||||||
|
firstInput = make(chan []RefSample, 300)
|
||||||
|
input = firstInput
|
||||||
|
)
|
||||||
|
wg.Add(n)
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
output := make(chan []RefSample, 300)
|
||||||
|
|
||||||
|
go func(i int, input <-chan []RefSample, output chan<- []RefSample) {
|
||||||
|
unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output)
|
||||||
|
atomic.AddUint64(&unknownRefs, unknown)
|
||||||
|
wg.Done()
|
||||||
|
}(i, input, output)
|
||||||
|
|
||||||
|
// The output feeds the next worker goroutine. For the last worker,
|
||||||
|
// it feeds the initial input again to reuse the RefSample slices.
|
||||||
|
input = output
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(fabxc): series entries spread between samples can starve the sample workers.
|
||||||
|
// Even with bufferd channels, this can impact startup time with lots of series churn.
|
||||||
|
// We must not pralellize series creation itself but could make the indexing asynchronous.
|
||||||
|
seriesFunc := func(series []RefSeries) {
|
||||||
|
for _, s := range series {
|
||||||
|
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
||||||
|
|
||||||
|
if h.lastSeriesID < s.Ref {
|
||||||
|
h.lastSeriesID = s.Ref
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
samplesFunc := func(samples []RefSample) {
|
||||||
|
var buf []RefSample
|
||||||
|
select {
|
||||||
|
case buf = <-input:
|
||||||
|
default:
|
||||||
|
buf = make([]RefSample, 0, len(samples)*11/10)
|
||||||
|
}
|
||||||
|
firstInput <- append(buf[:0], samples...)
|
||||||
|
}
|
||||||
|
deletesFunc := func(stones []Stone) {
|
||||||
for _, s := range stones {
|
for _, s := range stones {
|
||||||
for _, itv := range s.intervals {
|
for _, itv := range s.intervals {
|
||||||
if itv.Maxt < mint {
|
if itv.Maxt < mint {
|
||||||
|
@ -232,16 +290,22 @@ func (h *Head) ReadWAL() error {
|
||||||
h.tombstones.add(s.ref, itv)
|
h.tombstones.add(s.ref, itv)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := r.Read(seriesFunc, samplesFunc, deletesFunc)
|
||||||
|
|
||||||
|
// Signal termination to first worker and wait for last one to close its output channel.
|
||||||
|
close(firstInput)
|
||||||
|
for range input {
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "consume WAL")
|
||||||
|
}
|
||||||
if unknownRefs > 0 {
|
if unknownRefs > 0 {
|
||||||
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
|
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
|
|
||||||
return errors.Wrap(err, "consume WAL")
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,6 +367,23 @@ func (h *Head) initTime(t int64) (initialized bool) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rangeHead struct {
|
||||||
|
head *Head
|
||||||
|
mint, maxt int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *rangeHead) Index() (IndexReader, error) {
|
||||||
|
return h.head.indexRange(h.mint, h.maxt), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *rangeHead) Chunks() (ChunkReader, error) {
|
||||||
|
return h.head.chunksRange(h.mint, h.maxt), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *rangeHead) Tombstones() (TombstoneReader, error) {
|
||||||
|
return h.head.tombstones, nil
|
||||||
|
}
|
||||||
|
|
||||||
// initAppender is a helper to initialize the time bounds of a the head
|
// initAppender is a helper to initialize the time bounds of a the head
|
||||||
// upon the first sample it receives.
|
// upon the first sample it receives.
|
||||||
type initAppender struct {
|
type initAppender struct {
|
||||||
|
@ -609,13 +690,14 @@ func (h *Head) gc() {
|
||||||
h.symMtx.Unlock()
|
h.symMtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) Tombstones() TombstoneReader {
|
// Tombstones returns a new reader over the head's tombstones
|
||||||
return h.tombstones
|
func (h *Head) Tombstones() (TombstoneReader, error) {
|
||||||
|
return h.tombstones, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index returns an IndexReader against the block.
|
// Index returns an IndexReader against the block.
|
||||||
func (h *Head) Index() IndexReader {
|
func (h *Head) Index() (IndexReader, error) {
|
||||||
return h.indexRange(math.MinInt64, math.MaxInt64)
|
return h.indexRange(math.MinInt64, math.MaxInt64), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
|
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
|
||||||
|
@ -626,8 +708,8 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Chunks returns a ChunkReader against the block.
|
// Chunks returns a ChunkReader against the block.
|
||||||
func (h *Head) Chunks() ChunkReader {
|
func (h *Head) Chunks() (ChunkReader, error) {
|
||||||
return h.chunksRange(math.MinInt64, math.MaxInt64)
|
return h.chunksRange(math.MinInt64, math.MaxInt64), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) chunksRange(mint, maxt int64) *headChunkReader {
|
func (h *Head) chunksRange(mint, maxt int64) *headChunkReader {
|
||||||
|
@ -680,10 +762,11 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
c := s.chunk(int(cid))
|
c := s.chunk(int(cid))
|
||||||
|
mint, maxt := c.minTime, c.maxTime
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
// Do not expose chunks that are outside of the specified range.
|
// Do not expose chunks that are outside of the specified range.
|
||||||
if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
|
if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) {
|
||||||
return nil, ErrNotFound
|
return nil, ErrNotFound
|
||||||
}
|
}
|
||||||
return &safeChunk{
|
return &safeChunk{
|
||||||
|
@ -710,23 +793,6 @@ func (c *safeChunk) Iterator() chunks.Iterator {
|
||||||
// func (c *safeChunk) Bytes() []byte { panic("illegal") }
|
// func (c *safeChunk) Bytes() []byte { panic("illegal") }
|
||||||
// func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") }
|
// func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") }
|
||||||
|
|
||||||
type rangeHead struct {
|
|
||||||
head *Head
|
|
||||||
mint, maxt int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *rangeHead) Index() IndexReader {
|
|
||||||
return h.head.indexRange(h.mint, h.maxt)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *rangeHead) Chunks() ChunkReader {
|
|
||||||
return h.head.chunksRange(h.mint, h.maxt)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *rangeHead) Tombstones() TombstoneReader {
|
|
||||||
return newEmptyTombstoneReader()
|
|
||||||
}
|
|
||||||
|
|
||||||
type headIndexReader struct {
|
type headIndexReader struct {
|
||||||
head *Head
|
head *Head
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
|
@ -780,24 +846,17 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings {
|
||||||
if err := p.Err(); err != nil {
|
if err := p.Err(); err != nil {
|
||||||
return errPostings{err: errors.Wrap(err, "expand postings")}
|
return errPostings{err: errors.Wrap(err, "expand postings")}
|
||||||
}
|
}
|
||||||
var err error
|
|
||||||
|
|
||||||
sort.Slice(ep, func(i, j int) bool {
|
sort.Slice(ep, func(i, j int) bool {
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
a := h.head.series.getByID(ep[i])
|
a := h.head.series.getByID(ep[i])
|
||||||
b := h.head.series.getByID(ep[j])
|
b := h.head.series.getByID(ep[j])
|
||||||
|
|
||||||
if a == nil || b == nil {
|
if a == nil || b == nil {
|
||||||
err = errors.Errorf("series not found")
|
level.Debug(h.head.logger).Log("msg", "looked up series not found")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return labels.Compare(a.lset, b.lset) < 0
|
return labels.Compare(a.lset, b.lset) < 0
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
return errPostings{err: err}
|
|
||||||
}
|
|
||||||
return newListPostings(ep)
|
return newListPostings(ep)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -806,6 +865,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM
|
||||||
s := h.head.series.getByID(ref)
|
s := h.head.series.getByID(ref)
|
||||||
|
|
||||||
if s == nil {
|
if s == nil {
|
||||||
|
h.head.metrics.seriesNotFound.Inc()
|
||||||
return ErrNotFound
|
return ErrNotFound
|
||||||
}
|
}
|
||||||
*lbls = append((*lbls)[:0], s.lset...)
|
*lbls = append((*lbls)[:0], s.lset...)
|
||||||
|
@ -1169,10 +1229,12 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
|
||||||
c = s.cut(t)
|
c = s.cut(t)
|
||||||
chunkCreated = true
|
chunkCreated = true
|
||||||
}
|
}
|
||||||
|
numSamples := c.chunk.NumSamples()
|
||||||
|
|
||||||
if c.maxTime >= t {
|
if c.maxTime >= t {
|
||||||
return false, chunkCreated
|
return false, chunkCreated
|
||||||
}
|
}
|
||||||
if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt {
|
if numSamples > samplesPerChunk/4 && t >= s.nextAt {
|
||||||
c = s.cut(t)
|
c = s.cut(t)
|
||||||
chunkCreated = true
|
chunkCreated = true
|
||||||
}
|
}
|
||||||
|
@ -1180,7 +1242,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
|
||||||
|
|
||||||
c.maxTime = t
|
c.maxTime = t
|
||||||
|
|
||||||
if c.chunk.NumSamples() == samplesPerChunk/4 {
|
if numSamples == samplesPerChunk/4 {
|
||||||
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
|
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
|
||||||
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
|
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
|
||||||
}
|
}
|
||||||
|
|
7
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
7
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
|
@ -19,14 +19,14 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash"
|
"hash"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"math"
|
|
||||||
|
|
||||||
"github.com/prometheus/tsdb/fileutil"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -525,6 +525,8 @@ type IndexReader interface {
|
||||||
|
|
||||||
// Postings returns the postings list iterator for the label pair.
|
// Postings returns the postings list iterator for the label pair.
|
||||||
// The Postings here contain the offsets to the series inside the index.
|
// The Postings here contain the offsets to the series inside the index.
|
||||||
|
// Found IDs are not strictly required to point to a valid Series, e.g. during
|
||||||
|
// background garbage collections.
|
||||||
Postings(name, value string) (Postings, error)
|
Postings(name, value string) (Postings, error)
|
||||||
|
|
||||||
// SortedPostings returns a postings list that is reordered to be sorted
|
// SortedPostings returns a postings list that is reordered to be sorted
|
||||||
|
@ -533,6 +535,7 @@ type IndexReader interface {
|
||||||
|
|
||||||
// Series populates the given labels and chunk metas for the series identified
|
// Series populates the given labels and chunk metas for the series identified
|
||||||
// by the reference.
|
// by the reference.
|
||||||
|
// Returns ErrNotFound if the ref does not resolve to a known series.
|
||||||
Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error
|
Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error
|
||||||
|
|
||||||
// LabelIndices returns the label pairs for which indices exist.
|
// LabelIndices returns the label pairs for which indices exist.
|
||||||
|
|
60
vendor/github.com/prometheus/tsdb/postings.go
generated
vendored
60
vendor/github.com/prometheus/tsdb/postings.go
generated
vendored
|
@ -15,6 +15,7 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -22,14 +23,30 @@ import (
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// memPostings holds postings list for series ID per label pair. They may be written
|
||||||
|
// to out of order.
|
||||||
|
// ensureOrder() must be called once before any reads are done. This allows for quick
|
||||||
|
// unordered batch fills on startup.
|
||||||
type memPostings struct {
|
type memPostings struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
m map[labels.Label][]uint64
|
m map[labels.Label][]uint64
|
||||||
|
ordered bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// newMemPoistings returns a memPostings that's ready for reads and writes.
|
||||||
func newMemPostings() *memPostings {
|
func newMemPostings() *memPostings {
|
||||||
return &memPostings{
|
return &memPostings{
|
||||||
m: make(map[labels.Label][]uint64, 512),
|
m: make(map[labels.Label][]uint64, 512),
|
||||||
|
ordered: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newUnorderedMemPostings returns a memPostings that is not safe to be read from
|
||||||
|
// until ensureOrder was called once.
|
||||||
|
func newUnorderedMemPostings() *memPostings {
|
||||||
|
return &memPostings{
|
||||||
|
m: make(map[labels.Label][]uint64, 512),
|
||||||
|
ordered: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,6 +64,40 @@ func (p *memPostings) get(name, value string) Postings {
|
||||||
|
|
||||||
var allPostingsKey = labels.Label{}
|
var allPostingsKey = labels.Label{}
|
||||||
|
|
||||||
|
// ensurePostings ensures that all postings lists are sorted. After it returns all further
|
||||||
|
// calls to add and addFor will insert new IDs in a sorted manner.
|
||||||
|
func (p *memPostings) ensureOrder() {
|
||||||
|
p.mtx.Lock()
|
||||||
|
defer p.mtx.Unlock()
|
||||||
|
|
||||||
|
if p.ordered {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n := runtime.GOMAXPROCS(0)
|
||||||
|
workc := make(chan []uint64)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(n)
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
go func() {
|
||||||
|
for l := range workc {
|
||||||
|
sort.Slice(l, func(i, j int) bool { return l[i] < l[j] })
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, l := range p.m {
|
||||||
|
workc <- l
|
||||||
|
}
|
||||||
|
close(workc)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
p.ordered = true
|
||||||
|
}
|
||||||
|
|
||||||
// add adds a document to the index. The caller has to ensure that no
|
// add adds a document to the index. The caller has to ensure that no
|
||||||
// term argument appears twice.
|
// term argument appears twice.
|
||||||
func (p *memPostings) add(id uint64, lset labels.Labels) {
|
func (p *memPostings) add(id uint64, lset labels.Labels) {
|
||||||
|
@ -64,6 +115,9 @@ func (p *memPostings) addFor(id uint64, l labels.Label) {
|
||||||
list := append(p.m[l], id)
|
list := append(p.m[l], id)
|
||||||
p.m[l] = list
|
p.m[l] = list
|
||||||
|
|
||||||
|
if !p.ordered {
|
||||||
|
return
|
||||||
|
}
|
||||||
// There is no guarantee that no higher ID was inserted before as they may
|
// There is no guarantee that no higher ID was inserted before as they may
|
||||||
// be generated independently before adding them to postings.
|
// be generated independently before adding them to postings.
|
||||||
// We repair order violations on insert. The invariant is that the first n-1
|
// We repair order violations on insert. The invariant is that the first n-1
|
||||||
|
|
43
vendor/github.com/prometheus/tsdb/querier.go
generated
vendored
43
vendor/github.com/prometheus/tsdb/querier.go
generated
vendored
|
@ -18,6 +18,7 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/tsdb/chunks"
|
"github.com/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
@ -50,7 +51,6 @@ type Series interface {
|
||||||
// querier aggregates querying results from time blocks within
|
// querier aggregates querying results from time blocks within
|
||||||
// a single partition.
|
// a single partition.
|
||||||
type querier struct {
|
type querier struct {
|
||||||
db *DB
|
|
||||||
blocks []Querier
|
blocks []Querier
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,21 +103,30 @@ func (q *querier) Close() error {
|
||||||
for _, bq := range q.blocks {
|
for _, bq := range q.blocks {
|
||||||
merr.Add(bq.Close())
|
merr.Add(bq.Close())
|
||||||
}
|
}
|
||||||
q.db.mtx.RUnlock()
|
|
||||||
|
|
||||||
return merr.Err()
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBlockQuerier returns a queries against the readers.
|
// NewBlockQuerier returns a queries against the readers.
|
||||||
func NewBlockQuerier(ir IndexReader, cr ChunkReader, tr TombstoneReader, mint, maxt int64) Querier {
|
func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) {
|
||||||
return &blockQuerier{
|
indexr, err := b.Index()
|
||||||
index: ir,
|
if err != nil {
|
||||||
chunks: cr,
|
return nil, errors.Wrapf(err, "open index reader")
|
||||||
tombstones: tr,
|
|
||||||
|
|
||||||
mint: mint,
|
|
||||||
maxt: maxt,
|
|
||||||
}
|
}
|
||||||
|
chunkr, err := b.Chunks()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "open chunk reader")
|
||||||
|
}
|
||||||
|
tombsr, err := b.Tombstones()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "open tombstone reader")
|
||||||
|
}
|
||||||
|
return &blockQuerier{
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
index: indexr,
|
||||||
|
chunks: chunkr,
|
||||||
|
tombstones: tombsr,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// blockQuerier provides querying access to a single block database.
|
// blockQuerier provides querying access to a single block database.
|
||||||
|
@ -175,7 +184,13 @@ func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *blockQuerier) Close() error {
|
func (q *blockQuerier) Close() error {
|
||||||
return nil
|
var merr MultiError
|
||||||
|
|
||||||
|
merr.Add(q.index.Close())
|
||||||
|
merr.Add(q.chunks.Close())
|
||||||
|
merr.Add(q.tombstones.Close())
|
||||||
|
|
||||||
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// postingsReader is used to select matching postings from an IndexReader.
|
// postingsReader is used to select matching postings from an IndexReader.
|
||||||
|
@ -435,6 +450,10 @@ Outer:
|
||||||
for s.p.Next() {
|
for s.p.Next() {
|
||||||
ref := s.p.At()
|
ref := s.p.At()
|
||||||
if err := s.index.Series(ref, &lset, &chunks); err != nil {
|
if err := s.index.Series(ref, &lset, &chunks); err != nil {
|
||||||
|
// Postings may be stale. Skip if no underlying series exists.
|
||||||
|
if errors.Cause(err) == ErrNotFound {
|
||||||
|
continue
|
||||||
|
}
|
||||||
s.err = err
|
s.err = err
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
18
vendor/github.com/prometheus/tsdb/tabwriter.go
generated
vendored
Normal file
18
vendor/github.com/prometheus/tsdb/tabwriter.go
generated
vendored
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
package tsdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"text/tabwriter"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
minwidth = 0
|
||||||
|
tabwidth = 0
|
||||||
|
padding = 2
|
||||||
|
padchar = ' '
|
||||||
|
flags = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
func GetNewTabWriter(output io.Writer) *tabwriter.Writer {
|
||||||
|
return tabwriter.NewWriter(output, minwidth, tabwidth, padding, padchar, flags)
|
||||||
|
}
|
8
vendor/github.com/prometheus/tsdb/tombstones.go
generated
vendored
8
vendor/github.com/prometheus/tsdb/tombstones.go
generated
vendored
|
@ -33,9 +33,11 @@ const (
|
||||||
tombstoneFormatV1 = 1
|
tombstoneFormatV1 = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
// TombstoneReader is the iterator over tombstones.
|
// TombstoneReader gives access to tombstone intervals by series reference.
|
||||||
type TombstoneReader interface {
|
type TombstoneReader interface {
|
||||||
Get(ref uint64) Intervals
|
Get(ref uint64) Intervals
|
||||||
|
|
||||||
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeTombstoneFile(dir string, tr tombstoneReader) error {
|
func writeTombstoneFile(dir string, tr tombstoneReader) error {
|
||||||
|
@ -154,6 +156,10 @@ func (t tombstoneReader) add(ref uint64, itv Interval) {
|
||||||
t[ref] = t[ref].add(itv)
|
t[ref] = t[ref].add(itv)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tombstoneReader) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Interval represents a single time-interval.
|
// Interval represents a single time-interval.
|
||||||
type Interval struct {
|
type Interval struct {
|
||||||
Mint, Maxt int64
|
Mint, Maxt int64
|
||||||
|
|
250
vendor/github.com/prometheus/tsdb/wal.go
generated
vendored
250
vendor/github.com/prometheus/tsdb/wal.go
generated
vendored
|
@ -27,16 +27,16 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/tsdb/fileutil"
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/tsdb/labels"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/tsdb/fileutil"
|
||||||
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WALEntryType indicates what data a WAL entry contains.
|
// WALEntryType indicates what data a WAL entry contains.
|
||||||
type WALEntryType byte
|
type WALEntryType uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// WALMagic is a 4 byte number every WAL segment file starts with.
|
// WALMagic is a 4 byte number every WAL segment file starts with.
|
||||||
|
@ -54,20 +54,9 @@ const (
|
||||||
WALEntryDeletes WALEntryType = 4
|
WALEntryDeletes WALEntryType = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
// SamplesCB is the callback after reading samples. The passed slice
|
|
||||||
// is only valid until the call returns.
|
|
||||||
type SamplesCB func([]RefSample) error
|
|
||||||
|
|
||||||
// SeriesCB is the callback after reading series. The passed slice
|
|
||||||
// is only valid until the call returns.
|
|
||||||
type SeriesCB func([]RefSeries) error
|
|
||||||
|
|
||||||
// DeletesCB is the callback after reading deletes. The passed slice
|
|
||||||
// is only valid until the call returns.
|
|
||||||
type DeletesCB func([]Stone) error
|
|
||||||
|
|
||||||
type walMetrics struct {
|
type walMetrics struct {
|
||||||
fsyncDuration prometheus.Summary
|
fsyncDuration prometheus.Summary
|
||||||
|
corruptions prometheus.Counter
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
|
func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
|
||||||
|
@ -77,10 +66,15 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
|
||||||
Name: "tsdb_wal_fsync_duration_seconds",
|
Name: "tsdb_wal_fsync_duration_seconds",
|
||||||
Help: "Duration of WAL fsync.",
|
Help: "Duration of WAL fsync.",
|
||||||
})
|
})
|
||||||
|
m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "tsdb_wal_corruptions_total",
|
||||||
|
Help: "Total number of WAL corruptions.",
|
||||||
|
})
|
||||||
|
|
||||||
if r != nil {
|
if r != nil {
|
||||||
r.MustRegister(
|
r.MustRegister(
|
||||||
m.fsyncDuration,
|
m.fsyncDuration,
|
||||||
|
m.corruptions,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
return m
|
return m
|
||||||
|
@ -104,17 +98,27 @@ func NopWAL() WAL {
|
||||||
|
|
||||||
type nopWAL struct{}
|
type nopWAL struct{}
|
||||||
|
|
||||||
func (nopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil }
|
func (nopWAL) Read(
|
||||||
func (w nopWAL) Reader() WALReader { return w }
|
seriesf func([]RefSeries),
|
||||||
func (nopWAL) LogSeries([]RefSeries) error { return nil }
|
samplesf func([]RefSample),
|
||||||
func (nopWAL) LogSamples([]RefSample) error { return nil }
|
deletesf func([]Stone),
|
||||||
func (nopWAL) LogDeletes([]Stone) error { return nil }
|
) error {
|
||||||
func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil }
|
return nil
|
||||||
func (nopWAL) Close() error { return nil }
|
}
|
||||||
|
func (w nopWAL) Reader() WALReader { return w }
|
||||||
|
func (nopWAL) LogSeries([]RefSeries) error { return nil }
|
||||||
|
func (nopWAL) LogSamples([]RefSample) error { return nil }
|
||||||
|
func (nopWAL) LogDeletes([]Stone) error { return nil }
|
||||||
|
func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil }
|
||||||
|
func (nopWAL) Close() error { return nil }
|
||||||
|
|
||||||
// WALReader reads entries from a WAL.
|
// WALReader reads entries from a WAL.
|
||||||
type WALReader interface {
|
type WALReader interface {
|
||||||
Read(SeriesCB, SamplesCB, DeletesCB) error
|
Read(
|
||||||
|
seriesf func([]RefSeries),
|
||||||
|
samplesf func([]RefSample),
|
||||||
|
deletesf func([]Stone),
|
||||||
|
) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// RefSeries is the series labels with the series ID.
|
// RefSeries is the series labels with the series ID.
|
||||||
|
@ -170,7 +174,7 @@ func newCRC32() hash.Hash32 {
|
||||||
|
|
||||||
// SegmentWAL is a write ahead log for series data.
|
// SegmentWAL is a write ahead log for series data.
|
||||||
type SegmentWAL struct {
|
type SegmentWAL struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
metrics *walMetrics
|
metrics *walMetrics
|
||||||
|
|
||||||
dirFile *os.File
|
dirFile *os.File
|
||||||
|
@ -238,15 +242,20 @@ type repairingWALReader struct {
|
||||||
r WALReader
|
r WALReader
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *repairingWALReader) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error {
|
func (r *repairingWALReader) Read(
|
||||||
err := r.r.Read(series, samples, deletes)
|
seriesf func([]RefSeries),
|
||||||
|
samplesf func([]RefSample),
|
||||||
|
deletesf func([]Stone),
|
||||||
|
) error {
|
||||||
|
err := r.r.Read(seriesf, samplesf, deletesf)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
cerr, ok := err.(walCorruptionErr)
|
cerr, ok := errors.Cause(err).(walCorruptionErr)
|
||||||
if !ok {
|
if !ok {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
r.wal.metrics.corruptions.Inc()
|
||||||
return r.wal.truncate(cerr.err, cerr.file, cerr.lastOffset)
|
return r.wal.truncate(cerr.err, cerr.file, cerr.lastOffset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,6 +345,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
|
||||||
var (
|
var (
|
||||||
csf = newSegmentFile(f)
|
csf = newSegmentFile(f)
|
||||||
crc32 = newCRC32()
|
crc32 = newCRC32()
|
||||||
|
decSeries = []RefSeries{}
|
||||||
activeSeries = []RefSeries{}
|
activeSeries = []RefSeries{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -345,13 +355,14 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
|
||||||
if rt != WALEntrySeries {
|
if rt != WALEntrySeries {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
series, err := r.decodeSeries(flag, byt)
|
decSeries = decSeries[:0]
|
||||||
|
activeSeries = activeSeries[:0]
|
||||||
|
|
||||||
|
err := r.decodeSeries(flag, byt, &decSeries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode samples while truncating")
|
return errors.Wrap(err, "decode samples while truncating")
|
||||||
}
|
}
|
||||||
activeSeries = activeSeries[:0]
|
for _, s := range decSeries {
|
||||||
|
|
||||||
for _, s := range series {
|
|
||||||
if keep(s.Ref) {
|
if keep(s.Ref) {
|
||||||
activeSeries = append(activeSeries, s)
|
activeSeries = append(activeSeries, s)
|
||||||
}
|
}
|
||||||
|
@ -807,10 +818,6 @@ type walReader struct {
|
||||||
curBuf []byte
|
curBuf []byte
|
||||||
lastOffset int64 // offset after last successfully read entry
|
lastOffset int64 // offset after last successfully read entry
|
||||||
|
|
||||||
seriesBuf []RefSeries
|
|
||||||
sampleBuf []RefSample
|
|
||||||
tombstoneBuf []Stone
|
|
||||||
|
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -831,70 +838,118 @@ func (r *walReader) Err() error {
|
||||||
return r.err
|
return r.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error {
|
func (r *walReader) Read(
|
||||||
if seriesf == nil {
|
seriesf func([]RefSeries),
|
||||||
seriesf = func([]RefSeries) error { return nil }
|
samplesf func([]RefSample),
|
||||||
}
|
deletesf func([]Stone),
|
||||||
if samplesf == nil {
|
) error {
|
||||||
samplesf = func([]RefSample) error { return nil }
|
// Concurrency for replaying the WAL is very limited. We at least split out decoding and
|
||||||
}
|
// processing into separate threads.
|
||||||
if deletesf == nil {
|
// Historically, the processing is the bottleneck with reading and decoding using only
|
||||||
deletesf = func([]Stone) error { return nil }
|
// 15% of the CPU.
|
||||||
}
|
var (
|
||||||
|
seriesPool sync.Pool
|
||||||
|
samplePool sync.Pool
|
||||||
|
deletePool sync.Pool
|
||||||
|
)
|
||||||
|
donec := make(chan struct{})
|
||||||
|
datac := make(chan interface{}, 100)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(donec)
|
||||||
|
|
||||||
|
for x := range datac {
|
||||||
|
switch v := x.(type) {
|
||||||
|
case []RefSeries:
|
||||||
|
if seriesf != nil {
|
||||||
|
seriesf(v)
|
||||||
|
}
|
||||||
|
seriesPool.Put(v[:0])
|
||||||
|
case []RefSample:
|
||||||
|
if samplesf != nil {
|
||||||
|
samplesf(v)
|
||||||
|
}
|
||||||
|
samplePool.Put(v[:0])
|
||||||
|
case []Stone:
|
||||||
|
if deletesf != nil {
|
||||||
|
deletesf(v)
|
||||||
|
}
|
||||||
|
deletePool.Put(v[:0])
|
||||||
|
default:
|
||||||
|
level.Error(r.logger).Log("msg", "unexpected data type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
for r.next() {
|
for r.next() {
|
||||||
et, flag, b := r.at()
|
et, flag, b := r.at()
|
||||||
|
|
||||||
// In decoding below we never return a walCorruptionErr for now.
|
// In decoding below we never return a walCorruptionErr for now.
|
||||||
// Those should generally be catched by entry decoding before.
|
// Those should generally be catched by entry decoding before.
|
||||||
switch et {
|
switch et {
|
||||||
case WALEntrySeries:
|
case WALEntrySeries:
|
||||||
series, err := r.decodeSeries(flag, b)
|
var series []RefSeries
|
||||||
|
if v := seriesPool.Get(); v == nil {
|
||||||
|
series = make([]RefSeries, 0, 512)
|
||||||
|
} else {
|
||||||
|
series = v.([]RefSeries)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.decodeSeries(flag, b, &series)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode series entry")
|
err = errors.Wrap(err, "decode series entry")
|
||||||
}
|
break
|
||||||
if err := seriesf(series); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
datac <- series
|
||||||
|
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
if cf.minSeries > s.Ref {
|
if cf.minSeries > s.Ref {
|
||||||
cf.minSeries = s.Ref
|
cf.minSeries = s.Ref
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case WALEntrySamples:
|
case WALEntrySamples:
|
||||||
samples, err := r.decodeSamples(flag, b)
|
var samples []RefSample
|
||||||
|
if v := samplePool.Get(); v == nil {
|
||||||
|
samples = make([]RefSample, 0, 512)
|
||||||
|
} else {
|
||||||
|
samples = v.([]RefSample)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.decodeSamples(flag, b, &samples)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode samples entry")
|
err = errors.Wrap(err, "decode samples entry")
|
||||||
}
|
break
|
||||||
if err := samplesf(samples); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
datac <- samples
|
||||||
|
|
||||||
// Update the times for the WAL segment file.
|
// Update the times for the WAL segment file.
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
if cf.maxTime < s.T {
|
if cf.maxTime < s.T {
|
||||||
cf.maxTime = s.T
|
cf.maxTime = s.T
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case WALEntryDeletes:
|
case WALEntryDeletes:
|
||||||
stones, err := r.decodeDeletes(flag, b)
|
var deletes []Stone
|
||||||
|
if v := deletePool.Get(); v == nil {
|
||||||
|
deletes = make([]Stone, 0, 512)
|
||||||
|
} else {
|
||||||
|
deletes = v.([]Stone)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.decodeDeletes(flag, b, &deletes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode delete entry")
|
err = errors.Wrap(err, "decode delete entry")
|
||||||
}
|
break
|
||||||
if err := deletesf(stones); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
datac <- deletes
|
||||||
|
|
||||||
// Update the times for the WAL segment file.
|
// Update the times for the WAL segment file.
|
||||||
|
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
for _, s := range deletes {
|
||||||
for _, s := range stones {
|
|
||||||
for _, iv := range s.intervals {
|
for _, iv := range s.intervals {
|
||||||
if cf.maxTime < iv.Maxt {
|
if cf.maxTime < iv.Maxt {
|
||||||
cf.maxTime = iv.Maxt
|
cf.maxTime = iv.Maxt
|
||||||
|
@ -903,27 +958,16 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
close(datac)
|
||||||
|
<-donec
|
||||||
|
|
||||||
return r.Err()
|
if err != nil {
|
||||||
}
|
return err
|
||||||
|
|
||||||
// nextEntry retrieves the next entry. It is also used as a testing hook.
|
|
||||||
func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
|
||||||
if r.cur >= len(r.files) {
|
|
||||||
return 0, 0, nil, io.EOF
|
|
||||||
}
|
}
|
||||||
cf := r.current()
|
if r.Err() != nil {
|
||||||
|
return errors.Wrap(r.Err(), "read entry")
|
||||||
et, flag, b, err := r.entry(cf)
|
|
||||||
// If we reached the end of the reader, advance to the next one and close.
|
|
||||||
// Do not close on the last one as it will still be appended to.
|
|
||||||
if err == io.EOF && r.cur < len(r.files)-1 {
|
|
||||||
// Current reader completed. Leave the file open for later reads
|
|
||||||
// for truncating.
|
|
||||||
r.cur++
|
|
||||||
return r.nextEntry()
|
|
||||||
}
|
}
|
||||||
return et, flag, b, err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) at() (WALEntryType, byte, []byte) {
|
func (r *walReader) at() (WALEntryType, byte, []byte) {
|
||||||
|
@ -1043,9 +1087,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
||||||
return etype, flag, buf, nil
|
return etype, flag, buf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
|
func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error {
|
||||||
r.seriesBuf = r.seriesBuf[:0]
|
|
||||||
|
|
||||||
dec := decbuf{b: b}
|
dec := decbuf{b: b}
|
||||||
|
|
||||||
for len(dec.b) > 0 && dec.err() == nil {
|
for len(dec.b) > 0 && dec.err() == nil {
|
||||||
|
@ -1059,25 +1101,24 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
|
||||||
}
|
}
|
||||||
sort.Sort(lset)
|
sort.Sort(lset)
|
||||||
|
|
||||||
r.seriesBuf = append(r.seriesBuf, RefSeries{
|
*res = append(*res, RefSeries{
|
||||||
Ref: ref,
|
Ref: ref,
|
||||||
Labels: lset,
|
Labels: lset,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if dec.err() != nil {
|
if dec.err() != nil {
|
||||||
return nil, dec.err()
|
return dec.err()
|
||||||
}
|
}
|
||||||
if len(dec.b) > 0 {
|
if len(dec.b) > 0 {
|
||||||
return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||||
}
|
}
|
||||||
return r.seriesBuf, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
|
func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error {
|
||||||
if len(b) == 0 {
|
if len(b) == 0 {
|
||||||
return nil, nil
|
return nil
|
||||||
}
|
}
|
||||||
r.sampleBuf = r.sampleBuf[:0]
|
|
||||||
dec := decbuf{b: b}
|
dec := decbuf{b: b}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -1090,7 +1131,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
|
||||||
dtime := dec.varint64()
|
dtime := dec.varint64()
|
||||||
val := dec.be64()
|
val := dec.be64()
|
||||||
|
|
||||||
r.sampleBuf = append(r.sampleBuf, RefSample{
|
*res = append(*res, RefSample{
|
||||||
Ref: uint64(int64(baseRef) + dref),
|
Ref: uint64(int64(baseRef) + dref),
|
||||||
T: baseTime + dtime,
|
T: baseTime + dtime,
|
||||||
V: math.Float64frombits(val),
|
V: math.Float64frombits(val),
|
||||||
|
@ -1098,20 +1139,19 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if dec.err() != nil {
|
if dec.err() != nil {
|
||||||
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf))
|
return errors.Wrapf(dec.err(), "decode error after %d samples", len(*res))
|
||||||
}
|
}
|
||||||
if len(dec.b) > 0 {
|
if len(dec.b) > 0 {
|
||||||
return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||||
}
|
}
|
||||||
return r.sampleBuf, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
|
func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
|
||||||
dec := &decbuf{b: b}
|
dec := &decbuf{b: b}
|
||||||
r.tombstoneBuf = r.tombstoneBuf[:0]
|
|
||||||
|
|
||||||
for dec.len() > 0 && dec.err() == nil {
|
for dec.len() > 0 && dec.err() == nil {
|
||||||
r.tombstoneBuf = append(r.tombstoneBuf, Stone{
|
*res = append(*res, Stone{
|
||||||
ref: dec.be64(),
|
ref: dec.be64(),
|
||||||
intervals: Intervals{
|
intervals: Intervals{
|
||||||
{Mint: dec.varint64(), Maxt: dec.varint64()},
|
{Mint: dec.varint64(), Maxt: dec.varint64()},
|
||||||
|
@ -1119,10 +1159,10 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if dec.err() != nil {
|
if dec.err() != nil {
|
||||||
return nil, dec.err()
|
return dec.err()
|
||||||
}
|
}
|
||||||
if len(dec.b) > 0 {
|
if len(dec.b) > 0 {
|
||||||
return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||||
}
|
}
|
||||||
return r.tombstoneBuf, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
18
vendor/vendor.json
vendored
18
vendor/vendor.json
vendored
|
@ -843,28 +843,28 @@
|
||||||
"revisionTime": "2016-04-11T19:08:41Z"
|
"revisionTime": "2016-04-11T19:08:41Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "u1ERYVx8oD5cH3UkQunUTi9n1WI=",
|
"checksumSHA1": "h3i8+wLSIqLvWBWjNPcARM0IQik=",
|
||||||
"path": "github.com/prometheus/tsdb",
|
"path": "github.com/prometheus/tsdb",
|
||||||
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
|
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
|
||||||
"revisionTime": "2017-10-05T07:27:10Z"
|
"revisionTime": "2017-10-12T13:27:08Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
|
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
|
||||||
"path": "github.com/prometheus/tsdb/chunks",
|
"path": "github.com/prometheus/tsdb/chunks",
|
||||||
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
|
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
|
||||||
"revisionTime": "2017-10-05T07:27:10Z"
|
"revisionTime": "2017-10-12T13:27:08Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=",
|
"checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=",
|
||||||
"path": "github.com/prometheus/tsdb/fileutil",
|
"path": "github.com/prometheus/tsdb/fileutil",
|
||||||
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
|
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
|
||||||
"revisionTime": "2017-10-05T07:27:10Z"
|
"revisionTime": "2017-10-12T13:27:08Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
||||||
"path": "github.com/prometheus/tsdb/labels",
|
"path": "github.com/prometheus/tsdb/labels",
|
||||||
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
|
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
|
||||||
"revisionTime": "2017-10-05T07:27:10Z"
|
"revisionTime": "2017-10-12T13:27:08Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
||||||
|
|
Loading…
Reference in a new issue