From fc2e56c13f22556ae5cb5cbc15f5f8afd5309e38 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 20 Mar 2017 14:07:25 +0100 Subject: [PATCH] vendor: update tsdb --- vendor/github.com/fabxc/tsdb/block.go | 60 ++++++-- vendor/github.com/fabxc/tsdb/chunks.go | 2 +- vendor/github.com/fabxc/tsdb/compact.go | 8 +- vendor/github.com/fabxc/tsdb/db.go | 173 +++++++++++++----------- vendor/github.com/fabxc/tsdb/head.go | 33 ++++- vendor/github.com/fabxc/tsdb/index.go | 10 +- vendor/github.com/fabxc/tsdb/querier.go | 21 +-- vendor/vendor.json | 6 +- 8 files changed, 192 insertions(+), 121 deletions(-) diff --git a/vendor/github.com/fabxc/tsdb/block.go b/vendor/github.com/fabxc/tsdb/block.go index 9bb5d6ebb8..4939ee5d33 100644 --- a/vendor/github.com/fabxc/tsdb/block.go +++ b/vendor/github.com/fabxc/tsdb/block.go @@ -2,6 +2,7 @@ package tsdb import ( "encoding/json" + "fmt" "io/ioutil" "os" "path/filepath" @@ -11,8 +12,8 @@ import ( "github.com/pkg/errors" ) -// Block handles reads against a Block of time series data. -type Block interface { +// DiskBlock handles reads against a Block of time series data. +type DiskBlock interface { // Directory where block data is stored. Dir() string @@ -29,6 +30,32 @@ type Block interface { Close() error } +// Block is an interface to a DiskBlock that can also be queried. +type Block interface { + DiskBlock + Queryable +} + +// HeadBlock is a regular block that can still be appended to. +type HeadBlock interface { + Block + Appendable +} + +// Appendable defines an entity to which data can be appended. +type Appendable interface { + // Appender returns a new Appender against an underlying store. + Appender() Appender + + // Busy returns whether there are any currently active appenders. + Busy() bool +} + +// Queryable defines an entity which provides a Querier. +type Queryable interface { + Querier(mint, maxt int64) Querier +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -60,14 +87,6 @@ const ( flagStd = 1 ) -type persistedBlock struct { - dir string - meta BlockMeta - - chunkr *chunkReader - indexr *indexReader -} - type blockMeta struct { Version int `json:"version"` @@ -115,6 +134,14 @@ func writeMetaFile(dir string, meta *BlockMeta) error { return renameFile(tmp, path) } +type persistedBlock struct { + dir string + meta BlockMeta + + chunkr *chunkReader + indexr *indexReader +} + func newPersistedBlock(dir string) (*persistedBlock, error) { meta, err := readMetaFile(dir) if err != nil { @@ -148,6 +175,19 @@ func (pb *persistedBlock) Close() error { return merr.Err() } +func (pb *persistedBlock) String() string { + return fmt.Sprintf("(%d, %s)", pb.meta.Sequence, pb.meta.ULID) +} + +func (pb *persistedBlock) Querier(mint, maxt int64) Querier { + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: pb.Index(), + chunks: pb.Chunks(), + } +} + func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } diff --git a/vendor/github.com/fabxc/tsdb/chunks.go b/vendor/github.com/fabxc/tsdb/chunks.go index b3aaeb206d..c628108b9a 100644 --- a/vendor/github.com/fabxc/tsdb/chunks.go +++ b/vendor/github.com/fabxc/tsdb/chunks.go @@ -15,7 +15,7 @@ import ( ) const ( - // MagicSeries 4 bytes at the head of series file. + // MagicChunks 4 bytes at the head of series file. MagicChunks = 0x85BD40DD ) diff --git a/vendor/github.com/fabxc/tsdb/compact.go b/vendor/github.com/fabxc/tsdb/compact.go index a1d7b1e5c5..a1720605a6 100644 --- a/vendor/github.com/fabxc/tsdb/compact.go +++ b/vendor/github.com/fabxc/tsdb/compact.go @@ -1,6 +1,7 @@ package tsdb import ( + "fmt" "math/rand" "os" "path/filepath" @@ -8,6 +9,7 @@ import ( "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" + "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -32,6 +34,7 @@ type Compactor interface { // compactor implements the Compactor interface. type compactor struct { metrics *compactorMetrics + logger log.Logger opts *compactorOptions } @@ -71,9 +74,10 @@ type compactorOptions struct { maxBlockRange uint64 } -func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { +func newCompactor(r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { return &compactor{ opts: opts, + logger: l, metrics: newCompactorMetrics(r), } } @@ -178,6 +182,8 @@ func (c *compactor) Write(dir string, b Block) error { // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. func (c *compactor) write(dir string, blocks ...Block) (err error) { + c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks)) + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() diff --git a/vendor/github.com/fabxc/tsdb/db.go b/vendor/github.com/fabxc/tsdb/db.go index ec0f0a7018..1c5e0b3303 100644 --- a/vendor/github.com/fabxc/tsdb/db.go +++ b/vendor/github.com/fabxc/tsdb/db.go @@ -11,7 +11,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "unsafe" @@ -94,15 +93,13 @@ type DB struct { // Mutex for that must be held when modifying the general // block layout. - mtx sync.RWMutex - persisted []*persistedBlock - seqBlocks map[int]Block + mtx sync.RWMutex + blocks []Block // Mutex that must be held when modifying just the head blocks // or the general layout. headmtx sync.RWMutex - heads []*headBlock - headGen uint8 + heads []HeadBlock compactor Compactor @@ -177,7 +174,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db donec: make(chan struct{}), stopc: make(chan struct{}), } - db.compactor = newCompactor(r, &compactorOptions{ + db.compactor = newCompactor(r, l, &compactorOptions{ maxBlockRange: opts.MaxBlockDuration, }) @@ -205,19 +202,20 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - var merr MultiError - changes1, err := db.retentionCutoff() - merr.Add(err) + if err != nil { + db.logger.Log("msg", "retention cutoff failed", "err", err) + } changes2, err := db.compact() - merr.Add(err) + if err != nil { + db.logger.Log("msg", "compaction failed", "err", err) + } if changes1 || changes2 { - merr.Add(db.reloadBlocks()) - } - if err := merr.Err(); err != nil { - db.logger.Log("msg", "compaction failed", "err", err) + if err := db.reloadBlocks(); err != nil { + db.logger.Log("msg", "reloading blocks failed", "err", err) + } } case <-db.stopc: @@ -234,13 +232,16 @@ func (db *DB) retentionCutoff() (bool, error) { db.mtx.RLock() defer db.mtx.RUnlock() + db.headmtx.RLock() + defer db.headmtx.RUnlock() + // We don't count the span covered by head blocks towards the // retention time as it generally makes up a fraction of it. - if len(db.persisted) == 0 { + if len(db.blocks)-len(db.heads) == 0 { return false, nil } - last := db.persisted[len(db.persisted)-1] + last := db.blocks[len(db.blocks)-len(db.heads)-1] mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) return retentionCutoff(db.dir, mint) @@ -251,7 +252,7 @@ func (db *DB) compact() (changes bool, err error) { // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. - var singles []*headBlock + var singles []Block // Collect head blocks that are ready for compaction. Write them after // returning the lock to not block Appenders. @@ -262,7 +263,7 @@ func (db *DB) compact() (changes bool, err error) { // Blocks that won't be appendable when instantiating a new appender // might still have active appenders on them. // Abort at the first one we encounter. - if atomic.LoadUint64(&h.activeWriters) > 0 { + if h.Busy() { break } singles = append(singles, h) @@ -271,13 +272,10 @@ func (db *DB) compact() (changes bool, err error) { db.headmtx.RUnlock() -Loop: for _, h := range singles { - db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) - select { case <-db.stopc: - break Loop + return changes, nil default: } @@ -296,16 +294,15 @@ Loop: select { case <-db.stopc: - return false, nil + return changes, nil default: } + // We just execute compactions sequentially to not cause too extreme // CPU and memory spikes. // TODO(fabxc): return more descriptive plans in the future that allow // estimation of resource usage and conditional parallelization? for _, p := range plans { - db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p)) - if err := db.compactor.Compact(p...); err != nil { return changes, errors.Wrapf(err, "compact %s", p) } @@ -323,6 +320,10 @@ Loop: // retentionCutoff deletes all directories of blocks in dir that are strictly // before mint. func retentionCutoff(dir string, mint int64) (bool, error) { + df, err := fileutil.OpenDir(dir) + if err != nil { + return false, errors.Wrapf(err, "open directory") + } dirs, err := blockDirs(dir) if err != nil { return false, errors.Wrapf(err, "list block dirs %s", dir) @@ -347,7 +348,16 @@ func retentionCutoff(dir string, mint int64) (bool, error) { } } - return changes, nil + return changes, fileutil.Fsync(df) +} + +func (db *DB) seqBlock(i int) (Block, bool) { + for _, b := range db.blocks { + if b.Meta().Sequence == i { + return b, true + } + } + return nil, false } func (db *DB) reloadBlocks() error { @@ -366,8 +376,8 @@ func (db *DB) reloadBlocks() error { } var ( metas []*BlockMeta - persisted []*persistedBlock - heads []*headBlock + blocks []Block + heads []HeadBlock seqBlocks = make(map[int]Block, len(dirs)) ) @@ -380,7 +390,7 @@ func (db *DB) reloadBlocks() error { } for i, meta := range metas { - b, ok := db.seqBlocks[meta.Sequence] + b, ok := db.seqBlock(meta.Sequence) if meta.Compaction.Generation == 0 { if !ok { @@ -392,7 +402,7 @@ func (db *DB) reloadBlocks() error { if meta.ULID != b.Meta().ULID { return errors.Errorf("head block ULID changed unexpectedly") } - heads = append(heads, b.(*headBlock)) + heads = append(heads, b.(HeadBlock)) } else { if !ok || meta.ULID != b.Meta().ULID { b, err = newPersistedBlock(dirs[i]) @@ -400,22 +410,21 @@ func (db *DB) reloadBlocks() error { return errors.Wrapf(err, "open persisted block %s", dirs[i]) } } - persisted = append(persisted, b.(*persistedBlock)) } seqBlocks[meta.Sequence] = b + blocks = append(blocks, b) } // Close all blocks that we no longer need. They are closed after returning all // locks to avoid questionable locking order. - for seq, b := range db.seqBlocks { - if nb, ok := seqBlocks[seq]; !ok || nb != b { + for _, b := range db.blocks { + if nb := seqBlocks[b.Meta().Sequence]; nb != b { cs = append(cs, b) } } - db.seqBlocks = seqBlocks - db.persisted = persisted + db.blocks = blocks db.heads = heads return nil @@ -431,12 +440,10 @@ func (db *DB) Close() error { var g errgroup.Group - for _, pb := range db.persisted { + // blocks also contains all head blocks. + for _, pb := range db.blocks { g.Go(pb.Close) } - for _, hb := range db.heads { - g.Go(hb.Close) - } var merr MultiError @@ -454,54 +461,59 @@ func (db *DB) Appender() Appender { // Only instantiate appender after returning the headmtx to avoid // questionable locking order. db.headmtx.RLock() - app := db.appendable() - heads := make([]*headBlock, len(app)) - copy(heads, app) - db.headmtx.RUnlock() - for _, b := range heads { - a.heads = append(a.heads, b.Appender().(*headAppender)) + for _, b := range app { + a.heads = append(a.heads, &metaAppender{ + meta: b.Meta(), + app: b.Appender().(*headAppender), + }) } return a } type dbAppender struct { - db *DB - heads []*headAppender + db *DB + heads []*metaAppender + samples int } +type metaAppender struct { + meta BlockMeta + app Appender +} + func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { h, err := a.appenderFor(t) if err != nil { return 0, err } - ref, err := h.Add(lset, t, v) + ref, err := h.app.Add(lset, t, v) if err != nil { return 0, err } a.samples++ - return ref | (uint64(h.generation) << 40), nil + // Store last byte of sequence number in 3rd byte of refernece. + return ref | (uint64(h.meta.Sequence^0xff) << 40), nil } func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { - // We store the head generation in the 4th byte and use it to reject - // stale references. - gen := uint8((ref << 16) >> 56) + // Load the head last byte of the head sequence from the 3rd byte of the + // reference number. + gen := (ref << 16) >> 56 h, err := a.appenderFor(t) if err != nil { return err } - // If the reference pointed into a previous block, we cannot - // use it to append the sample. - if h.generation != gen { + // If the last byte of the sequence does not add up, the reference is not valid. + if uint64(h.meta.Sequence^0xff) != gen { return ErrNotFound } - if err := h.AddFast(ref, t, v); err != nil { + if err := h.app.AddFast(ref, t, v); err != nil { return err } @@ -511,12 +523,12 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { // appenderFor gets the appender for the head containing timestamp t. // If the head block doesn't exist yet, it gets created. -func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { +func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { // If there's no fitting head block for t, ensure it gets created. if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { a.db.headmtx.Lock() - var newHeads []*headBlock + var newHeads []HeadBlock if err := a.db.ensureHead(t); err != nil { a.db.headmtx.Unlock() @@ -527,7 +539,7 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { } else { maxSeq := a.heads[len(a.heads)-1].meta.Sequence for _, b := range a.db.appendable() { - if b.meta.Sequence > maxSeq { + if b.Meta().Sequence > maxSeq { newHeads = append(newHeads, b) } } @@ -538,7 +550,10 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { // Instantiate appenders after returning headmtx to avoid questionable // locking order. for _, b := range newHeads { - a.heads = append(a.heads, b.Appender().(*headAppender)) + a.heads = append(a.heads, &metaAppender{ + app: b.Appender(), + meta: b.Meta(), + }) } } for i := len(a.heads) - 1; i >= 0; i-- { @@ -565,11 +580,12 @@ func (db *DB) ensureHead(t int64) error { for { h := db.heads[len(db.heads)-1] + m := h.Meta() // If t doesn't exceed the range of heads blocks, there's nothing to do. - if t < h.meta.MaxTime { + if t < m.MaxTime { return nil } - if _, err := db.cut(h.meta.MaxTime); err != nil { + if _, err := db.cut(m.MaxTime); err != nil { return err } } @@ -579,7 +595,7 @@ func (a *dbAppender) Commit() error { var merr MultiError for _, h := range a.heads { - merr.Add(h.Commit()) + merr.Add(h.app.Commit()) } a.db.mtx.RUnlock() @@ -593,18 +609,22 @@ func (a *dbAppender) Rollback() error { var merr MultiError for _, h := range a.heads { - merr.Add(h.Rollback()) + merr.Add(h.app.Rollback()) } a.db.mtx.RUnlock() return merr.Err() } -func (db *DB) appendable() []*headBlock { - if len(db.heads) <= db.opts.AppendableBlocks { - return db.heads +// appendable returns a copy of a slice of HeadBlocks that can still be appended to. +func (db *DB) appendable() []HeadBlock { + var i int + app := make([]HeadBlock, 0, db.opts.AppendableBlocks) + + if len(db.heads) > db.opts.AppendableBlocks { + i = len(db.heads) - db.opts.AppendableBlocks } - return db.heads[len(db.heads)-db.opts.AppendableBlocks:] + return append(app, db.heads[i:]...) } func intervalOverlap(amin, amax, bmin, bmax int64) bool { @@ -626,13 +646,7 @@ func intervalContains(min, max, t int64) bool { func (db *DB) blocksForInterval(mint, maxt int64) []Block { var bs []Block - for _, b := range db.persisted { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { - bs = append(bs, b) - } - } - for _, b := range db.heads { + for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { bs = append(bs, b) @@ -644,7 +658,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (db *DB) cut(mint int64) (*headBlock, error) { +func (db *DB) cut(mint int64) (HeadBlock, error) { maxt := mint + int64(db.opts.MinBlockDuration) dir, seq, err := nextSequenceFile(db.dir, "b-") @@ -656,11 +670,8 @@ func (db *DB) cut(mint int64) (*headBlock, error) { return nil, err } + db.blocks = append(db.blocks, newHead) db.heads = append(db.heads, newHead) - db.seqBlocks[seq] = newHead - db.headGen++ - - newHead.generation = db.headGen select { case db.compactc <- struct{}{}: diff --git a/vendor/github.com/fabxc/tsdb/head.go b/vendor/github.com/fabxc/tsdb/head.go index eed2ff2222..4864276f99 100644 --- a/vendor/github.com/fabxc/tsdb/head.go +++ b/vendor/github.com/fabxc/tsdb/head.go @@ -36,10 +36,9 @@ var ( // headBlock handles reads and writes of time series data within a time window. type headBlock struct { - mtx sync.RWMutex - dir string - generation uint8 - wal *WAL + mtx sync.RWMutex + dir string + wal *WAL activeWriters uint64 closed bool @@ -136,6 +135,10 @@ func (h *headBlock) inBounds(t int64) bool { return t >= h.meta.MinTime && t <= h.meta.MaxTime } +func (h *headBlock) String() string { + return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID) +} + // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { h.mtx.Lock() @@ -173,6 +176,22 @@ func (h *headBlock) Persisted() bool { return false } func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } +func (h *headBlock) Querier(mint, maxt int64) Querier { + h.mtx.RLock() + defer h.mtx.RUnlock() + + if h.closed { + panic(fmt.Sprintf("block %s already closed", h.dir)) + } + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: h.Index(), + chunks: h.Chunks(), + postingsMapper: h.remapPostings, + } +} + func (h *headBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) @@ -184,6 +203,10 @@ func (h *headBlock) Appender() Appender { return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} } +func (h *headBlock) Busy() bool { + return atomic.LoadUint64(&h.activeWriters) > 0 +} + var headPool = sync.Pool{} func getHeadAppendBuffer() []refdSample { @@ -265,6 +288,8 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { // sample sequence is valid. // We also have to revalidate it as we switch locks an create // the new series. + } else if ref > uint64(len(a.series)) { + return ErrNotFound } else { ms := a.series[int(ref)] if ms == nil { diff --git a/vendor/github.com/fabxc/tsdb/index.go b/vendor/github.com/fabxc/tsdb/index.go index f957085763..d0e557dd62 100644 --- a/vendor/github.com/fabxc/tsdb/index.go +++ b/vendor/github.com/fabxc/tsdb/index.go @@ -567,7 +567,10 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { key := strings.Join(names, string(sep)) off, ok := r.labels[key] if !ok { - return nil, fmt.Errorf("label index doesn't exist") + // XXX(fabxc): hot fix. Should return a partial data error and handle cases + // where the entire block has no data gracefully. + return emptyStringTuples{}, nil + //return nil, fmt.Errorf("label index doesn't exist") } flag, b, err := r.section(off) @@ -590,6 +593,11 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { return st, nil } +type emptyStringTuples struct{} + +func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } +func (emptyStringTuples) Len() int { return 0 } + func (r *indexReader) LabelIndices() ([][]string, error) { res := [][]string{} diff --git a/vendor/github.com/fabxc/tsdb/querier.go b/vendor/github.com/fabxc/tsdb/querier.go index c027a8dc0e..555731171a 100644 --- a/vendor/github.com/fabxc/tsdb/querier.go +++ b/vendor/github.com/fabxc/tsdb/querier.go @@ -55,27 +55,8 @@ func (s *DB) Querier(mint, maxt int64) Querier { blocks: make([]Querier, 0, len(blocks)), db: s, } - for _, b := range blocks { - q := &blockQuerier{ - mint: mint, - maxt: maxt, - index: b.Index(), - chunks: b.Chunks(), - } - - // TODO(fabxc): find nicer solution. - if hb, ok := b.(*headBlock); ok { - // TODO(fabxc): temporary refactored. - hb.mtx.RLock() - if hb.closed { - panic(fmt.Sprintf("block %s already closed", hb.dir)) - } - hb.mtx.RUnlock() - q.postingsMapper = hb.remapPostings - } - - sq.blocks = append(sq.blocks, q) + sq.blocks = append(sq.blocks, b.Querier(mint, maxt)) } return sq diff --git a/vendor/vendor.json b/vendor/vendor.json index 4d14b8af87..830e145093 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -368,10 +368,10 @@ "revisionTime": "2016-09-30T00:14:02Z" }, { - "checksumSHA1": "JeYYg27cZpCWZYwYOm7r+UnUR2o=", + "checksumSHA1": "8wTICzej/k4pCcYtSw+fmD6oZZE=", "path": "github.com/fabxc/tsdb", - "revision": "863d38dfeebaceb69ce57cbba862102e10222256", - "revisionTime": "2017-03-17T14:56:19Z" + "revision": "2ef3682560a31bd03f0ba70eb6ec509512ad0de8", + "revisionTime": "2017-03-20T10:37:06Z" }, { "checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",