diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 33572c93f..6be55c0c5 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -140,7 +140,11 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t } func (a adapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { - return querier{q: a.db.Querier(mint, maxt)}, nil + q, err := a.db.Querier(mint, maxt) + if err != nil { + return nil, err + } + return querier{q: q}, nil } // Appender returns a new appender against the storage. diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 90915a1f0..63b468f27 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "os" "path/filepath" + "sync" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -26,33 +27,16 @@ import ( "github.com/prometheus/tsdb/labels" ) -// DiskBlock represents a data block backed by on-disk data. -type DiskBlock interface { - BlockReader - - // Directory where block data is stored. - Dir() string - - // Stats returns statistics about the block. - Meta() BlockMeta - - Delete(mint, maxt int64, m ...labels.Matcher) error - - Snapshot(dir string) error - - Close() error -} - // BlockReader provides reading access to a data block. type BlockReader interface { // Index returns an IndexReader over the block's data. - Index() IndexReader + Index() (IndexReader, error) // Chunks returns a ChunkReader over the block's data. - Chunks() ChunkReader + Chunks() (ChunkReader, error) // Tombstones returns a TombstoneReader over the block's deleted data. - Tombstones() TombstoneReader + Tombstones() (TombstoneReader, error) } // Appendable defines an entity to which data can be appended. @@ -149,7 +133,12 @@ func writeMetaFile(dir string, meta *BlockMeta) error { return renameFile(tmp, path) } -type persistedBlock struct { +// Block represents a directory of time series data covering a continous time range. +type Block struct { + mtx sync.RWMutex + closing bool + pendingReaders sync.WaitGroup + dir string meta BlockMeta @@ -159,7 +148,9 @@ type persistedBlock struct { tombstones tombstoneReader } -func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { +// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used +// to instantiate chunk structs. +func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -179,7 +170,7 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { return nil, err } - pb := &persistedBlock{ + pb := &Block{ dir: dir, meta: *meta, chunkr: cr, @@ -189,28 +180,110 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { return pb, nil } -func (pb *persistedBlock) Close() error { +// Close closes the on-disk block. It blocks as long as there are readers reading from the block. +func (pb *Block) Close() error { + pb.mtx.Lock() + pb.closing = true + pb.mtx.Unlock() + + pb.pendingReaders.Wait() + var merr MultiError merr.Add(pb.chunkr.Close()) merr.Add(pb.indexr.Close()) + merr.Add(pb.tombstones.Close()) return merr.Err() } -func (pb *persistedBlock) String() string { +func (pb *Block) String() string { return pb.meta.ULID.String() } -func (pb *persistedBlock) Dir() string { return pb.dir } -func (pb *persistedBlock) Index() IndexReader { return pb.indexr } -func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } -func (pb *persistedBlock) Tombstones() TombstoneReader { - return pb.tombstones -} -func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } +// Dir returns the directory of the block. +func (pb *Block) Dir() string { return pb.dir } + +// Meta returns meta information about the block. +func (pb *Block) Meta() BlockMeta { return pb.meta } + +// ErrClosing is returned when a block is in the process of being closed. +var ErrClosing = errors.New("block is closing") + +func (pb *Block) startRead() error { + pb.mtx.RLock() + defer pb.mtx.RUnlock() + + if pb.closing { + return ErrClosing + } + pb.pendingReaders.Add(1) + return nil +} + +// Index returns a new IndexReader against the block data. +func (pb *Block) Index() (IndexReader, error) { + if err := pb.startRead(); err != nil { + return nil, err + } + return blockIndexReader{IndexReader: pb.indexr, b: pb}, nil +} + +// Chunks returns a new ChunkReader against the block data. +func (pb *Block) Chunks() (ChunkReader, error) { + if err := pb.startRead(); err != nil { + return nil, err + } + return blockChunkReader{ChunkReader: pb.chunkr, b: pb}, nil +} + +// Tombstones returns a new TombstoneReader against the block data. +func (pb *Block) Tombstones() (TombstoneReader, error) { + if err := pb.startRead(); err != nil { + return nil, err + } + return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil +} + +type blockIndexReader struct { + IndexReader + b *Block +} + +func (r blockIndexReader) Close() error { + r.b.pendingReaders.Done() + return nil +} + +type blockTombstoneReader struct { + TombstoneReader + b *Block +} + +func (r blockTombstoneReader) Close() error { + r.b.pendingReaders.Done() + return nil +} + +type blockChunkReader struct { + ChunkReader + b *Block +} + +func (r blockChunkReader) Close() error { + r.b.pendingReaders.Done() + return nil +} + +// Delete matching series between mint and maxt in the block. +func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { + pb.mtx.Lock() + defer pb.mtx.Unlock() + + if pb.closing { + return ErrClosing + } -func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { pr := newPostingsReader(pb.indexr) p, absent := pr.Select(ms...) @@ -262,7 +335,8 @@ Outer: return writeMetaFile(pb.dir, &pb.meta) } -func (pb *persistedBlock) Snapshot(dir string) error { +// Snapshot creates snapshot of the block into dir. +func (pb *Block) Snapshot(dir string) error { blockDir := filepath.Join(dir, pb.meta.ULID.String()) if err := os.MkdirAll(blockDir, 0777); err != nil { return errors.Wrap(err, "create snapshot block dir") @@ -311,7 +385,6 @@ func clampInterval(a, b, mint, maxt int64) (int64, int64) { if b > maxt { b = maxt } - return a, b } diff --git a/vendor/github.com/prometheus/tsdb/chunks.go b/vendor/github.com/prometheus/tsdb/chunks.go index 626e7b41e..8152677f7 100644 --- a/vendor/github.com/prometheus/tsdb/chunks.go +++ b/vendor/github.com/prometheus/tsdb/chunks.go @@ -21,9 +21,9 @@ import ( "io" "os" - "github.com/prometheus/tsdb/fileutil" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/fileutil" ) const ( diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 4f5c40e1a..5b66082aa 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -14,6 +14,7 @@ package tsdb import ( + "io" "math/rand" "os" "path/filepath" @@ -299,7 +300,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { var metas []*BlockMeta for _, d := range dirs { - b, err := newPersistedBlock(d, c.chunkPool) + b, err := OpenBlock(d, c.chunkPool) if err != nil { return err } @@ -444,10 +445,30 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, var ( set compactionSet allSymbols = make(map[string]struct{}, 1<<16) + closers = []io.Closer{} ) - for i, b := range blocks { + defer func() { closeAll(closers...) }() - symbols, err := b.Index().Symbols() + for i, b := range blocks { + indexr, err := b.Index() + if err != nil { + return errors.Wrapf(err, "open index reader for block %s", b) + } + closers = append(closers, indexr) + + chunkr, err := b.Chunks() + if err != nil { + return errors.Wrapf(err, "open chunk reader for block %s", b) + } + closers = append(closers, chunkr) + + tombsr, err := b.Tombstones() + if err != nil { + return errors.Wrapf(err, "open tombstone reader for block %s", b) + } + closers = append(closers, tombsr) + + symbols, err := indexr.Symbols() if err != nil { return errors.Wrap(err, "read symbols") } @@ -455,15 +476,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, allSymbols[s] = struct{}{} } - indexr := b.Index() - all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value) if err != nil { return err } all = indexr.SortedPostings(all) - s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all) + s := newCompactionSeriesSet(indexr, chunkr, tombsr, all) if i == 0 { set = s @@ -565,7 +584,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return errors.Wrap(err, "write postings") } } - return nil } diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index e499de545..ff1763762 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -30,7 +30,6 @@ import ( "golang.org/x/sync/errgroup" - "github.com/prometheus/tsdb/fileutil" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/nightlyone/lockfile" @@ -38,6 +37,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" ) @@ -105,7 +105,7 @@ type DB struct { // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex - blocks []DiskBlock + blocks []*Block head *Head @@ -431,7 +431,7 @@ func retentionCutoff(dir string, mint int64) (bool, error) { return changes, fileutil.Fsync(df) } -func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) { +func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { for _, b := range db.blocks { if b.Meta().ULID == id { return b, true @@ -456,7 +456,7 @@ func (db *DB) reload() (err error) { return errors.Wrap(err, "find blocks") } var ( - blocks []DiskBlock + blocks []*Block exist = map[ulid.ULID]struct{}{} ) @@ -468,7 +468,7 @@ func (db *DB) reload() (err error) { b, ok := db.getBlock(meta.ULID) if !ok { - b, err = newPersistedBlock(dir, db.chunkPool) + b, err = OpenBlock(dir, db.chunkPool) if err != nil { return errors.Wrapf(err, "open block %s", dir) } @@ -505,7 +505,7 @@ func (db *DB) reload() (err error) { return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } -func validateBlockSequence(bs []DiskBlock) error { +func validateBlockSequence(bs []*Block) error { if len(bs) == 0 { return nil } @@ -521,13 +521,19 @@ func validateBlockSequence(bs []DiskBlock) error { return nil } -func (db *DB) Blocks() []DiskBlock { +func (db *DB) String() string { + return "HEAD" +} + +// Blocks returns the databases persisted blocks. +func (db *DB) Blocks() []*Block { db.mtx.RLock() defer db.mtx.RUnlock() return db.blocks } +// Head returns the databases's head. func (db *DB) Head() *Head { return db.head } @@ -587,41 +593,42 @@ func (db *DB) Snapshot(dir string) error { db.cmtx.Lock() defer db.cmtx.Unlock() - db.mtx.RLock() - defer db.mtx.RUnlock() - - for _, b := range db.blocks { + for _, b := range db.Blocks() { level.Info(db.logger).Log("msg", "snapshotting block", "block", b) if err := b.Snapshot(dir); err != nil { return errors.Wrap(err, "error snapshotting headblock") } } - return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) } // Querier returns a new querier over the data partition for the given time range. // A goroutine must not handle more than one open Querier. -func (db *DB) Querier(mint, maxt int64) Querier { - db.mtx.RLock() +func (db *DB) Querier(mint, maxt int64) (Querier, error) { + var blocks []BlockReader - blocks := db.blocksForInterval(mint, maxt) + for _, b := range db.Blocks() { + m := b.Meta() + if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + blocks = append(blocks, b) + } + } + if maxt >= db.head.MinTime() { + blocks = append(blocks, db.head) + } sq := &querier{ blocks: make([]Querier, 0, len(blocks)), - db: db, } for _, b := range blocks { - sq.blocks = append(sq.blocks, &blockQuerier{ - mint: mint, - maxt: maxt, - index: b.Index(), - chunks: b.Chunks(), - tombstones: b.Tombstones(), - }) + q, err := NewBlockQuerier(b, mint, maxt) + if err != nil { + return nil, errors.Wrapf(err, "open querier for block %s", b) + } + sq.blocks = append(sq.blocks, q) } - return sq + return sq, nil } func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { @@ -634,28 +641,22 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.cmtx.Lock() defer db.cmtx.Unlock() - db.mtx.Lock() - defer db.mtx.Unlock() - var g errgroup.Group - for _, b := range db.blocks { + for _, b := range db.Blocks() { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { - g.Go(func(b DiskBlock) func() error { + g.Go(func(b *Block) func() error { return func() error { return b.Delete(mint, maxt, ms...) } }(b)) } } - g.Go(func() error { return db.head.Delete(mint, maxt, ms...) }) - if err := g.Wait(); err != nil { return err } - return nil } @@ -668,24 +669,6 @@ func intervalContains(min, max, t int64) bool { return t >= min && t <= max } -// blocksForInterval returns all blocks within the partition that may contain -// data for the given time range. -func (db *DB) blocksForInterval(mint, maxt int64) []BlockReader { - var bs []BlockReader - - for _, b := range db.blocks { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { - bs = append(bs, b) - } - } - if maxt >= db.head.MinTime() { - bs = append(bs, db.head) - } - - return bs -} - func isBlockDir(fi os.FileInfo) bool { if !fi.IsDir() { return false diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 82a3459bd..37126363d 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -15,6 +15,7 @@ package tsdb import ( "math" + "runtime" "sort" "sync" "sync/atomic" @@ -73,6 +74,7 @@ type headMetrics struct { series prometheus.Gauge seriesCreated prometheus.Counter seriesRemoved prometheus.Counter + seriesNotFound prometheus.Counter chunks prometheus.Gauge chunksCreated prometheus.Gauge chunksRemoved prometheus.Gauge @@ -102,6 +104,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "tsdb_head_series_removed_total", Help: "Total number of series removed in the head", }) + m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_head_series_not_found", + Help: "Total number of requests for series that were not found.", + }) m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "tsdb_head_chunks", Help: "Total number of chunks in the head block.", @@ -118,13 +124,13 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "tsdb_head_gc_duration_seconds", Help: "Runtime of garbage collection in the head block.", }) - m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "tsdb_head_max_time", Help: "Maximum timestamp of the head block.", }, func() float64 { return float64(h.MaxTime()) }) - m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "tsdb_head_min_time", Help: "Minimum time bound of the head block.", }, func() float64 { @@ -148,6 +154,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.series, m.seriesCreated, m.seriesRemoved, + m.seriesNotFound, m.minTime, m.maxTime, m.gcDuration, @@ -178,7 +185,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( series: newStripeSeries(), values: map[string]stringset{}, symbols: map[string]struct{}{}, - postings: newMemPostings(), + postings: newUnorderedMemPostings(), tombstones: newEmptyTombstoneReader(), } h.metrics = newHeadMetrics(h, r) @@ -186,28 +193,19 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( return h, nil } -// ReadWAL initializes the head by consuming the write ahead log. -func (h *Head) ReadWAL() error { - r := h.wal.Reader() - mint := h.MinTime() +// processWALSamples adds a partition of samples it receives to the head and passes +// them on to other workers. +// Samples before the mint timestamp are discarded. +func (h *Head) processWALSamples( + mint int64, + partition, total uint64, + input <-chan []RefSample, output chan<- []RefSample, +) (unknownRefs uint64) { + defer close(output) - // Track number of samples that referenced a series we don't know about - // for error reporting. - var unknownRefs int - - seriesFunc := func(series []RefSeries) error { - for _, s := range series { - h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) - - if h.lastSeriesID < s.Ref { - h.lastSeriesID = s.Ref - } - } - return nil - } - samplesFunc := func(samples []RefSample) error { + for samples := range input { for _, s := range samples { - if s.T < mint { + if s.T < mint || s.Ref%total != partition { continue } ms := h.series.getByID(s.Ref) @@ -221,9 +219,69 @@ func (h *Head) ReadWAL() error { h.metrics.chunks.Inc() } } - return nil + output <- samples } - deletesFunc := func(stones []Stone) error { + return unknownRefs +} + +// ReadWAL initializes the head by consuming the write ahead log. +func (h *Head) ReadWAL() error { + defer h.postings.ensureOrder() + + r := h.wal.Reader() + mint := h.MinTime() + + // Track number of samples that referenced a series we don't know about + // for error reporting. + var unknownRefs uint64 + + // Start workers that each process samples for a partition of the series ID space. + // They are connected through a ring of channels which ensures that all sample batches + // read from the WAL are processed in order. + var ( + wg sync.WaitGroup + n = runtime.GOMAXPROCS(0) + firstInput = make(chan []RefSample, 300) + input = firstInput + ) + wg.Add(n) + + for i := 0; i < n; i++ { + output := make(chan []RefSample, 300) + + go func(i int, input <-chan []RefSample, output chan<- []RefSample) { + unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output) + atomic.AddUint64(&unknownRefs, unknown) + wg.Done() + }(i, input, output) + + // The output feeds the next worker goroutine. For the last worker, + // it feeds the initial input again to reuse the RefSample slices. + input = output + } + + // TODO(fabxc): series entries spread between samples can starve the sample workers. + // Even with bufferd channels, this can impact startup time with lots of series churn. + // We must not pralellize series creation itself but could make the indexing asynchronous. + seriesFunc := func(series []RefSeries) { + for _, s := range series { + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref + } + } + } + samplesFunc := func(samples []RefSample) { + var buf []RefSample + select { + case buf = <-input: + default: + buf = make([]RefSample, 0, len(samples)*11/10) + } + firstInput <- append(buf[:0], samples...) + } + deletesFunc := func(stones []Stone) { for _, s := range stones { for _, itv := range s.intervals { if itv.Maxt < mint { @@ -232,16 +290,22 @@ func (h *Head) ReadWAL() error { h.tombstones.add(s.ref, itv) } } - return nil } + err := r.Read(seriesFunc, samplesFunc, deletesFunc) + + // Signal termination to first worker and wait for last one to close its output channel. + close(firstInput) + for range input { + } + wg.Wait() + + if err != nil { + return errors.Wrap(err, "consume WAL") + } if unknownRefs > 0 { level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs) } - - if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { - return errors.Wrap(err, "consume WAL") - } return nil } @@ -303,6 +367,23 @@ func (h *Head) initTime(t int64) (initialized bool) { return true } +type rangeHead struct { + head *Head + mint, maxt int64 +} + +func (h *rangeHead) Index() (IndexReader, error) { + return h.head.indexRange(h.mint, h.maxt), nil +} + +func (h *rangeHead) Chunks() (ChunkReader, error) { + return h.head.chunksRange(h.mint, h.maxt), nil +} + +func (h *rangeHead) Tombstones() (TombstoneReader, error) { + return h.head.tombstones, nil +} + // initAppender is a helper to initialize the time bounds of a the head // upon the first sample it receives. type initAppender struct { @@ -609,13 +690,14 @@ func (h *Head) gc() { h.symMtx.Unlock() } -func (h *Head) Tombstones() TombstoneReader { - return h.tombstones +// Tombstones returns a new reader over the head's tombstones +func (h *Head) Tombstones() (TombstoneReader, error) { + return h.tombstones, nil } // Index returns an IndexReader against the block. -func (h *Head) Index() IndexReader { - return h.indexRange(math.MinInt64, math.MaxInt64) +func (h *Head) Index() (IndexReader, error) { + return h.indexRange(math.MinInt64, math.MaxInt64), nil } func (h *Head) indexRange(mint, maxt int64) *headIndexReader { @@ -626,8 +708,8 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { } // Chunks returns a ChunkReader against the block. -func (h *Head) Chunks() ChunkReader { - return h.chunksRange(math.MinInt64, math.MaxInt64) +func (h *Head) Chunks() (ChunkReader, error) { + return h.chunksRange(math.MinInt64, math.MaxInt64), nil } func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { @@ -680,10 +762,11 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { s.Lock() c := s.chunk(int(cid)) + mint, maxt := c.minTime, c.maxTime s.Unlock() // Do not expose chunks that are outside of the specified range. - if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { + if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) { return nil, ErrNotFound } return &safeChunk{ @@ -710,23 +793,6 @@ func (c *safeChunk) Iterator() chunks.Iterator { // func (c *safeChunk) Bytes() []byte { panic("illegal") } // func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } -type rangeHead struct { - head *Head - mint, maxt int64 -} - -func (h *rangeHead) Index() IndexReader { - return h.head.indexRange(h.mint, h.maxt) -} - -func (h *rangeHead) Chunks() ChunkReader { - return h.head.chunksRange(h.mint, h.maxt) -} - -func (h *rangeHead) Tombstones() TombstoneReader { - return newEmptyTombstoneReader() -} - type headIndexReader struct { head *Head mint, maxt int64 @@ -780,24 +846,17 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings { if err := p.Err(); err != nil { return errPostings{err: errors.Wrap(err, "expand postings")} } - var err error sort.Slice(ep, func(i, j int) bool { - if err != nil { - return false - } a := h.head.series.getByID(ep[i]) b := h.head.series.getByID(ep[j]) if a == nil || b == nil { - err = errors.Errorf("series not found") + level.Debug(h.head.logger).Log("msg", "looked up series not found") return false } return labels.Compare(a.lset, b.lset) < 0 }) - if err != nil { - return errPostings{err: err} - } return newListPostings(ep) } @@ -806,6 +865,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM s := h.head.series.getByID(ref) if s == nil { + h.head.metrics.seriesNotFound.Inc() return ErrNotFound } *lbls = append((*lbls)[:0], s.lset...) @@ -1169,10 +1229,12 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c = s.cut(t) chunkCreated = true } + numSamples := c.chunk.NumSamples() + if c.maxTime >= t { return false, chunkCreated } - if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { + if numSamples > samplesPerChunk/4 && t >= s.nextAt { c = s.cut(t) chunkCreated = true } @@ -1180,7 +1242,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c.maxTime = t - if c.chunk.NumSamples() == samplesPerChunk/4 { + if numSamples == samplesPerChunk/4 { _, maxt := rangeForTimestamp(c.minTime, s.chunkRange) s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt) } diff --git a/vendor/github.com/prometheus/tsdb/index.go b/vendor/github.com/prometheus/tsdb/index.go index b9cda3073..c0680aa33 100644 --- a/vendor/github.com/prometheus/tsdb/index.go +++ b/vendor/github.com/prometheus/tsdb/index.go @@ -19,14 +19,14 @@ import ( "fmt" "hash" "io" + "math" "os" "path/filepath" "sort" "strings" - "math" - "github.com/prometheus/tsdb/fileutil" "github.com/pkg/errors" + "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" ) @@ -525,6 +525,8 @@ type IndexReader interface { // Postings returns the postings list iterator for the label pair. // The Postings here contain the offsets to the series inside the index. + // Found IDs are not strictly required to point to a valid Series, e.g. during + // background garbage collections. Postings(name, value string) (Postings, error) // SortedPostings returns a postings list that is reordered to be sorted @@ -533,6 +535,7 @@ type IndexReader interface { // Series populates the given labels and chunk metas for the series identified // by the reference. + // Returns ErrNotFound if the ref does not resolve to a known series. Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error // LabelIndices returns the label pairs for which indices exist. diff --git a/vendor/github.com/prometheus/tsdb/postings.go b/vendor/github.com/prometheus/tsdb/postings.go index 0e51b221b..2647f4dd8 100644 --- a/vendor/github.com/prometheus/tsdb/postings.go +++ b/vendor/github.com/prometheus/tsdb/postings.go @@ -15,6 +15,7 @@ package tsdb import ( "encoding/binary" + "runtime" "sort" "strings" "sync" @@ -22,14 +23,30 @@ import ( "github.com/prometheus/tsdb/labels" ) +// memPostings holds postings list for series ID per label pair. They may be written +// to out of order. +// ensureOrder() must be called once before any reads are done. This allows for quick +// unordered batch fills on startup. type memPostings struct { - mtx sync.RWMutex - m map[labels.Label][]uint64 + mtx sync.RWMutex + m map[labels.Label][]uint64 + ordered bool } +// newMemPoistings returns a memPostings that's ready for reads and writes. func newMemPostings() *memPostings { return &memPostings{ - m: make(map[labels.Label][]uint64, 512), + m: make(map[labels.Label][]uint64, 512), + ordered: true, + } +} + +// newUnorderedMemPostings returns a memPostings that is not safe to be read from +// until ensureOrder was called once. +func newUnorderedMemPostings() *memPostings { + return &memPostings{ + m: make(map[labels.Label][]uint64, 512), + ordered: false, } } @@ -47,6 +64,40 @@ func (p *memPostings) get(name, value string) Postings { var allPostingsKey = labels.Label{} +// ensurePostings ensures that all postings lists are sorted. After it returns all further +// calls to add and addFor will insert new IDs in a sorted manner. +func (p *memPostings) ensureOrder() { + p.mtx.Lock() + defer p.mtx.Unlock() + + if p.ordered { + return + } + + n := runtime.GOMAXPROCS(0) + workc := make(chan []uint64) + + var wg sync.WaitGroup + wg.Add(n) + + for i := 0; i < n; i++ { + go func() { + for l := range workc { + sort.Slice(l, func(i, j int) bool { return l[i] < l[j] }) + } + wg.Done() + }() + } + + for _, l := range p.m { + workc <- l + } + close(workc) + wg.Wait() + + p.ordered = true +} + // add adds a document to the index. The caller has to ensure that no // term argument appears twice. func (p *memPostings) add(id uint64, lset labels.Labels) { @@ -64,6 +115,9 @@ func (p *memPostings) addFor(id uint64, l labels.Label) { list := append(p.m[l], id) p.m[l] = list + if !p.ordered { + return + } // There is no guarantee that no higher ID was inserted before as they may // be generated independently before adding them to postings. // We repair order violations on insert. The invariant is that the first n-1 diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index 5461fec89..12e05e79d 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -18,6 +18,7 @@ import ( "sort" "strings" + "github.com/pkg/errors" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" ) @@ -50,7 +51,6 @@ type Series interface { // querier aggregates querying results from time blocks within // a single partition. type querier struct { - db *DB blocks []Querier } @@ -103,21 +103,30 @@ func (q *querier) Close() error { for _, bq := range q.blocks { merr.Add(bq.Close()) } - q.db.mtx.RUnlock() - return merr.Err() } // NewBlockQuerier returns a queries against the readers. -func NewBlockQuerier(ir IndexReader, cr ChunkReader, tr TombstoneReader, mint, maxt int64) Querier { - return &blockQuerier{ - index: ir, - chunks: cr, - tombstones: tr, - - mint: mint, - maxt: maxt, +func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { + indexr, err := b.Index() + if err != nil { + return nil, errors.Wrapf(err, "open index reader") } + chunkr, err := b.Chunks() + if err != nil { + return nil, errors.Wrapf(err, "open chunk reader") + } + tombsr, err := b.Tombstones() + if err != nil { + return nil, errors.Wrapf(err, "open tombstone reader") + } + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: indexr, + chunks: chunkr, + tombstones: tombsr, + }, nil } // blockQuerier provides querying access to a single block database. @@ -175,7 +184,13 @@ func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { } func (q *blockQuerier) Close() error { - return nil + var merr MultiError + + merr.Add(q.index.Close()) + merr.Add(q.chunks.Close()) + merr.Add(q.tombstones.Close()) + + return merr.Err() } // postingsReader is used to select matching postings from an IndexReader. @@ -435,6 +450,10 @@ Outer: for s.p.Next() { ref := s.p.At() if err := s.index.Series(ref, &lset, &chunks); err != nil { + // Postings may be stale. Skip if no underlying series exists. + if errors.Cause(err) == ErrNotFound { + continue + } s.err = err return false } diff --git a/vendor/github.com/prometheus/tsdb/tabwriter.go b/vendor/github.com/prometheus/tsdb/tabwriter.go new file mode 100644 index 000000000..8e84a9c67 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/tabwriter.go @@ -0,0 +1,18 @@ +package tsdb + +import ( + "io" + "text/tabwriter" +) + +const ( + minwidth = 0 + tabwidth = 0 + padding = 2 + padchar = ' ' + flags = 0 +) + +func GetNewTabWriter(output io.Writer) *tabwriter.Writer { + return tabwriter.NewWriter(output, minwidth, tabwidth, padding, padchar, flags) +} diff --git a/vendor/github.com/prometheus/tsdb/tombstones.go b/vendor/github.com/prometheus/tsdb/tombstones.go index 7b24407b5..19b224634 100644 --- a/vendor/github.com/prometheus/tsdb/tombstones.go +++ b/vendor/github.com/prometheus/tsdb/tombstones.go @@ -33,9 +33,11 @@ const ( tombstoneFormatV1 = 1 ) -// TombstoneReader is the iterator over tombstones. +// TombstoneReader gives access to tombstone intervals by series reference. type TombstoneReader interface { Get(ref uint64) Intervals + + Close() error } func writeTombstoneFile(dir string, tr tombstoneReader) error { @@ -154,6 +156,10 @@ func (t tombstoneReader) add(ref uint64, itv Interval) { t[ref] = t[ref].add(itv) } +func (tombstoneReader) Close() error { + return nil +} + // Interval represents a single time-interval. type Interval struct { Mint, Maxt int64 diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 467c4e09b..5c6e78d2e 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -27,16 +27,16 @@ import ( "sync" "time" - "github.com/prometheus/tsdb/fileutil" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/tsdb/labels" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/labels" ) // WALEntryType indicates what data a WAL entry contains. -type WALEntryType byte +type WALEntryType uint8 const ( // WALMagic is a 4 byte number every WAL segment file starts with. @@ -54,20 +54,9 @@ const ( WALEntryDeletes WALEntryType = 4 ) -// SamplesCB is the callback after reading samples. The passed slice -// is only valid until the call returns. -type SamplesCB func([]RefSample) error - -// SeriesCB is the callback after reading series. The passed slice -// is only valid until the call returns. -type SeriesCB func([]RefSeries) error - -// DeletesCB is the callback after reading deletes. The passed slice -// is only valid until the call returns. -type DeletesCB func([]Stone) error - type walMetrics struct { fsyncDuration prometheus.Summary + corruptions prometheus.Counter } func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { @@ -77,10 +66,15 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { Name: "tsdb_wal_fsync_duration_seconds", Help: "Duration of WAL fsync.", }) + m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_wal_corruptions_total", + Help: "Total number of WAL corruptions.", + }) if r != nil { r.MustRegister( m.fsyncDuration, + m.corruptions, ) } return m @@ -104,17 +98,27 @@ func NopWAL() WAL { type nopWAL struct{} -func (nopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } -func (w nopWAL) Reader() WALReader { return w } -func (nopWAL) LogSeries([]RefSeries) error { return nil } -func (nopWAL) LogSamples([]RefSample) error { return nil } -func (nopWAL) LogDeletes([]Stone) error { return nil } -func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } -func (nopWAL) Close() error { return nil } +func (nopWAL) Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), +) error { + return nil +} +func (w nopWAL) Reader() WALReader { return w } +func (nopWAL) LogSeries([]RefSeries) error { return nil } +func (nopWAL) LogSamples([]RefSample) error { return nil } +func (nopWAL) LogDeletes([]Stone) error { return nil } +func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } +func (nopWAL) Close() error { return nil } // WALReader reads entries from a WAL. type WALReader interface { - Read(SeriesCB, SamplesCB, DeletesCB) error + Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), + ) error } // RefSeries is the series labels with the series ID. @@ -170,7 +174,7 @@ func newCRC32() hash.Hash32 { // SegmentWAL is a write ahead log for series data. type SegmentWAL struct { - mtx sync.Mutex + mtx sync.Mutex metrics *walMetrics dirFile *os.File @@ -238,15 +242,20 @@ type repairingWALReader struct { r WALReader } -func (r *repairingWALReader) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error { - err := r.r.Read(series, samples, deletes) +func (r *repairingWALReader) Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), +) error { + err := r.r.Read(seriesf, samplesf, deletesf) if err == nil { return nil } - cerr, ok := err.(walCorruptionErr) + cerr, ok := errors.Cause(err).(walCorruptionErr) if !ok { return err } + r.wal.metrics.corruptions.Inc() return r.wal.truncate(cerr.err, cerr.file, cerr.lastOffset) } @@ -336,6 +345,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { var ( csf = newSegmentFile(f) crc32 = newCRC32() + decSeries = []RefSeries{} activeSeries = []RefSeries{} ) @@ -345,13 +355,14 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { if rt != WALEntrySeries { continue } - series, err := r.decodeSeries(flag, byt) + decSeries = decSeries[:0] + activeSeries = activeSeries[:0] + + err := r.decodeSeries(flag, byt, &decSeries) if err != nil { return errors.Wrap(err, "decode samples while truncating") } - activeSeries = activeSeries[:0] - - for _, s := range series { + for _, s := range decSeries { if keep(s.Ref) { activeSeries = append(activeSeries, s) } @@ -807,10 +818,6 @@ type walReader struct { curBuf []byte lastOffset int64 // offset after last successfully read entry - seriesBuf []RefSeries - sampleBuf []RefSample - tombstoneBuf []Stone - err error } @@ -831,70 +838,118 @@ func (r *walReader) Err() error { return r.err } -func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error { - if seriesf == nil { - seriesf = func([]RefSeries) error { return nil } - } - if samplesf == nil { - samplesf = func([]RefSample) error { return nil } - } - if deletesf == nil { - deletesf = func([]Stone) error { return nil } - } +func (r *walReader) Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), +) error { + // Concurrency for replaying the WAL is very limited. We at least split out decoding and + // processing into separate threads. + // Historically, the processing is the bottleneck with reading and decoding using only + // 15% of the CPU. + var ( + seriesPool sync.Pool + samplePool sync.Pool + deletePool sync.Pool + ) + donec := make(chan struct{}) + datac := make(chan interface{}, 100) + + go func() { + defer close(donec) + + for x := range datac { + switch v := x.(type) { + case []RefSeries: + if seriesf != nil { + seriesf(v) + } + seriesPool.Put(v[:0]) + case []RefSample: + if samplesf != nil { + samplesf(v) + } + samplePool.Put(v[:0]) + case []Stone: + if deletesf != nil { + deletesf(v) + } + deletePool.Put(v[:0]) + default: + level.Error(r.logger).Log("msg", "unexpected data type") + } + } + }() + + var err error for r.next() { et, flag, b := r.at() + // In decoding below we never return a walCorruptionErr for now. // Those should generally be catched by entry decoding before. switch et { case WALEntrySeries: - series, err := r.decodeSeries(flag, b) + var series []RefSeries + if v := seriesPool.Get(); v == nil { + series = make([]RefSeries, 0, 512) + } else { + series = v.([]RefSeries) + } + + err := r.decodeSeries(flag, b, &series) if err != nil { - return errors.Wrap(err, "decode series entry") - } - if err := seriesf(series); err != nil { - return err + err = errors.Wrap(err, "decode series entry") + break } + datac <- series cf := r.current() - for _, s := range series { if cf.minSeries > s.Ref { cf.minSeries = s.Ref } } - case WALEntrySamples: - samples, err := r.decodeSamples(flag, b) + var samples []RefSample + if v := samplePool.Get(); v == nil { + samples = make([]RefSample, 0, 512) + } else { + samples = v.([]RefSample) + } + + err := r.decodeSamples(flag, b, &samples) if err != nil { - return errors.Wrap(err, "decode samples entry") - } - if err := samplesf(samples); err != nil { - return err + err = errors.Wrap(err, "decode samples entry") + break } + datac <- samples // Update the times for the WAL segment file. cf := r.current() - for _, s := range samples { if cf.maxTime < s.T { cf.maxTime = s.T } } - case WALEntryDeletes: - stones, err := r.decodeDeletes(flag, b) + var deletes []Stone + if v := deletePool.Get(); v == nil { + deletes = make([]Stone, 0, 512) + } else { + deletes = v.([]Stone) + } + + err := r.decodeDeletes(flag, b, &deletes) if err != nil { - return errors.Wrap(err, "decode delete entry") - } - if err := deletesf(stones); err != nil { - return err + err = errors.Wrap(err, "decode delete entry") + break } + datac <- deletes + // Update the times for the WAL segment file. - cf := r.current() - - for _, s := range stones { + for _, s := range deletes { for _, iv := range s.intervals { if cf.maxTime < iv.Maxt { cf.maxTime = iv.Maxt @@ -903,27 +958,16 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC } } } + close(datac) + <-donec - return r.Err() -} - -// nextEntry retrieves the next entry. It is also used as a testing hook. -func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { - if r.cur >= len(r.files) { - return 0, 0, nil, io.EOF + if err != nil { + return err } - cf := r.current() - - et, flag, b, err := r.entry(cf) - // If we reached the end of the reader, advance to the next one and close. - // Do not close on the last one as it will still be appended to. - if err == io.EOF && r.cur < len(r.files)-1 { - // Current reader completed. Leave the file open for later reads - // for truncating. - r.cur++ - return r.nextEntry() + if r.Err() != nil { + return errors.Wrap(r.Err(), "read entry") } - return et, flag, b, err + return nil } func (r *walReader) at() (WALEntryType, byte, []byte) { @@ -1043,9 +1087,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { - r.seriesBuf = r.seriesBuf[:0] - +func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { dec := decbuf{b: b} for len(dec.b) > 0 && dec.err() == nil { @@ -1059,25 +1101,24 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { } sort.Sort(lset) - r.seriesBuf = append(r.seriesBuf, RefSeries{ + *res = append(*res, RefSeries{ Ref: ref, Labels: lset, }) } if dec.err() != nil { - return nil, dec.err() + return dec.err() } if len(dec.b) > 0 { - return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return r.seriesBuf, nil + return nil } -func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { +func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { if len(b) == 0 { - return nil, nil + return nil } - r.sampleBuf = r.sampleBuf[:0] dec := decbuf{b: b} var ( @@ -1090,7 +1131,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { dtime := dec.varint64() val := dec.be64() - r.sampleBuf = append(r.sampleBuf, RefSample{ + *res = append(*res, RefSample{ Ref: uint64(int64(baseRef) + dref), T: baseTime + dtime, V: math.Float64frombits(val), @@ -1098,20 +1139,19 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { } if dec.err() != nil { - return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf)) + return errors.Wrapf(dec.err(), "decode error after %d samples", len(*res)) } if len(dec.b) > 0 { - return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return r.sampleBuf, nil + return nil } -func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { +func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { dec := &decbuf{b: b} - r.tombstoneBuf = r.tombstoneBuf[:0] for dec.len() > 0 && dec.err() == nil { - r.tombstoneBuf = append(r.tombstoneBuf, Stone{ + *res = append(*res, Stone{ ref: dec.be64(), intervals: Intervals{ {Mint: dec.varint64(), Maxt: dec.varint64()}, @@ -1119,10 +1159,10 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { }) } if dec.err() != nil { - return nil, dec.err() + return dec.err() } if len(dec.b) > 0 { - return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return r.tombstoneBuf, nil + return nil } diff --git a/vendor/vendor.json b/vendor/vendor.json index cbe4cf2d5..50e1577ea 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -843,28 +843,28 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "u1ERYVx8oD5cH3UkQunUTi9n1WI=", + "checksumSHA1": "h3i8+wLSIqLvWBWjNPcARM0IQik=", "path": "github.com/prometheus/tsdb", - "revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397", - "revisionTime": "2017-10-05T07:27:10Z" + "revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", + "revisionTime": "2017-10-12T13:27:08Z" }, { "checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397", - "revisionTime": "2017-10-05T07:27:10Z" + "revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", + "revisionTime": "2017-10-12T13:27:08Z" }, { "checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=", "path": "github.com/prometheus/tsdb/fileutil", - "revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397", - "revisionTime": "2017-10-05T07:27:10Z" + "revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", + "revisionTime": "2017-10-12T13:27:08Z" }, { "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "path": "github.com/prometheus/tsdb/labels", - "revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397", - "revisionTime": "2017-10-05T07:27:10Z" + "revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579", + "revisionTime": "2017-10-12T13:27:08Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",