diff --git a/promql/engine.go b/promql/engine.go index 4f37ced261..29e3869ada 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -517,7 +517,12 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q Inspect(s.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: - n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...)) + set, err := querier.Select(n.LabelMatchers...) + if err != nil { + level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) + return false + } + n.series, err = expandSeriesSet(set) if err != nil { // TODO(fabxc): use multi-error. level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) @@ -529,7 +534,12 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q } case *MatrixSelector: - n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...)) + set, err := querier.Select(n.LabelMatchers...) + if err != nil { + level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) + return false + } + n.series, err = expandSeriesSet(set) if err != nil { level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) return false diff --git a/rules/manager_test.go b/rules/manager_test.go index 4f68a9f1b6..5ff947129a 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -172,9 +172,16 @@ func TestStaleness(t *testing.T) { querier, err := storage.Querier(context.Background(), 0, 2000) testutil.Ok(t, err) defer querier.Close() - matcher, _ := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") - samples, err := readSeriesSet(querier.Select(matcher)) + + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") testutil.Ok(t, err) + + set, err := querier.Select(matcher) + testutil.Ok(t, err) + + samples, err := readSeriesSet(set) + testutil.Ok(t, err) + metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String() metricSample, ok := samples[metric] diff --git a/storage/fanout.go b/storage/fanout.go index 62f9730448..061d993af7 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -216,12 +216,16 @@ func NewMergeQuerier(queriers []Querier) Querier { } // Select returns a set of series that matches the given label matchers. -func (q *mergeQuerier) Select(matchers ...*labels.Matcher) SeriesSet { +func (q *mergeQuerier) Select(matchers ...*labels.Matcher) (SeriesSet, error) { seriesSets := make([]SeriesSet, 0, len(q.queriers)) for _, querier := range q.queriers { - seriesSets = append(seriesSets, querier.Select(matchers...)) + set, err := querier.Select(matchers...) + if err != nil { + return nil, err + } + seriesSets = append(seriesSets, set) } - return newMergeSeriesSet(seriesSets) + return newMergeSeriesSet(seriesSets), nil } // LabelValues returns all potential values for a label name. diff --git a/storage/interface.go b/storage/interface.go index f9bfc6a27f..71261b2c94 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -52,7 +52,7 @@ type Queryable interface { // Querier provides reading access to time series data. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(...*labels.Matcher) SeriesSet + Select(...*labels.Matcher) (SeriesSet, error) // LabelValues returns all potential values for a label name. LabelValues(name string) ([]string, error) diff --git a/storage/noop.go b/storage/noop.go index 358cf2611c..a5ff1bc9b4 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -22,8 +22,8 @@ func NoopQuerier() Querier { return noopQuerier{} } -func (noopQuerier) Select(...*labels.Matcher) SeriesSet { - return NoopSeriesSet() +func (noopQuerier) Select(...*labels.Matcher) (SeriesSet, error) { + return NoopSeriesSet(), nil } func (noopQuerier) LabelValues(name string) ([]string, error) { diff --git a/storage/remote/read.go b/storage/remote/read.go index be87c3f6fe..e49d3524a5 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -43,18 +43,18 @@ type querier struct { // Select implements storage.Querier and uses the given matchers to read series // sets from the Client. -func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { +func (q *querier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) { query, err := ToQuery(q.mint, q.maxt, matchers) if err != nil { - return errSeriesSet{err: err} + return nil, err } res, err := q.client.Read(q.ctx, query) if err != nil { - return errSeriesSet{err: err} + return nil, err } - return FromQueryResult(res) + return FromQueryResult(res), nil } // LabelValues implements storage.Querier and is a noop. @@ -91,10 +91,13 @@ type externalLabelsQuerier struct { // Select adds equality matchers for all external labels to the list of matchers // before calling the wrapped storage.Queryable. The added external labels are // removed from the returned series sets. -func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { +func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) { m, added := q.addExternalLabels(matchers) - s := q.Querier.Select(m...) - return newSeriesSetFilter(s, added) + s, err := q.Querier.Select(m...) + if err != nil { + return nil, err + } + return newSeriesSetFilter(s, added), nil } // PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier @@ -141,7 +144,7 @@ type requiredMatchersQuerier struct { // Select returns a NoopSeriesSet if the given matchers don't match the label // set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. -func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { +func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) { ms := q.requiredMatchers for _, m := range matchers { for i, r := range ms { @@ -155,7 +158,7 @@ func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) storage.Ser } } if len(ms) > 0 { - return storage.NoopSeriesSet() + return storage.NoopSeriesSet(), nil } return q.Querier.Select(matchers...) } diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 0bcdc45c76..f61ab37423 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -41,9 +41,12 @@ func TestExternalLabelsQuerierSelect(t *testing.T) { Querier: mockQuerier{}, externalLabels: model.LabelSet{"region": "europe"}, } - want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) - if have := q.Select(matchers...); !reflect.DeepEqual(want, have) { + have, err := q.Select(matchers...) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(want, have) { t.Errorf("expected series set %+v, got %+v", want, have) } } @@ -154,8 +157,8 @@ type mockSeriesSet struct { storage.SeriesSet } -func (mockQuerier) Select(...*labels.Matcher) storage.SeriesSet { - return mockSeriesSet{} +func (mockQuerier) Select(...*labels.Matcher) (storage.SeriesSet, error) { + return mockSeriesSet{}, nil } func TestPreferLocalStorageFilter(t *testing.T) { @@ -310,7 +313,11 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) { requiredMatchers: test.requiredMatchers, } - if want, have := test.seriesSet, q.Select(test.matchers...); want != have { + have, err := q.Select(test.matchers...) + if err != nil { + t.Error(err) + } + if want := test.seriesSet; want != have { t.Errorf("%d. expected series set %+v, got %+v", i, want, have) } if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) { diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index f077765444..070488fbf1 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -188,14 +188,17 @@ type querier struct { q tsdb.Querier } -func (q querier) Select(oms ...*labels.Matcher) storage.SeriesSet { +func (q querier) Select(oms ...*labels.Matcher) (storage.SeriesSet, error) { ms := make([]tsdbLabels.Matcher, 0, len(oms)) for _, om := range oms { ms = append(ms, convertMatcher(om)) } - - return seriesSet{set: q.q.Select(ms...)} + set, err := q.q.Select(ms...) + if err != nil { + return nil, err + } + return seriesSet{set: set}, nil } func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) } diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 63b468f272..8456cb3762 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -133,7 +133,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error { return renameFile(tmp, path) } -// Block represents a directory of time series data covering a continous time range. +// Block represents a directory of time series data covering a continuous time range. type Block struct { mtx sync.RWMutex closing bool @@ -142,10 +142,9 @@ type Block struct { dir string meta BlockMeta - chunkr *chunkReader - indexr *indexReader - - tombstones tombstoneReader + chunkr ChunkReader + indexr IndexReader + tombstones TombstoneReader } // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used @@ -156,11 +155,11 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { return nil, err } - cr, err := newChunkReader(chunkDir(dir), pool) + cr, err := NewDirChunkReader(chunkDir(dir), pool) if err != nil { return nil, err } - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) if err != nil { return nil, err } @@ -284,13 +283,15 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { return ErrClosing } - pr := newPostingsReader(pb.indexr) - p, absent := pr.Select(ms...) + p, absent, err := PostingsForMatchers(pb.indexr, ms...) + if err != nil { + return errors.Wrap(err, "select series") + } ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := map[uint64]Intervals{} + stones := memTombstones{} var lset labels.Labels var chks []ChunkMeta @@ -322,16 +323,21 @@ Outer: return p.Err() } - // Merge the current and new tombstones. - for k, v := range stones { - pb.tombstones.add(k, v[0]) + err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + for _, iv := range ivs { + stones.add(id, iv) + pb.meta.Stats.NumTombstones++ + } + return nil + }) + if err != nil { + return err } + pb.tombstones = stones if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { return err } - - pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones)) return writeMetaFile(pb.dir, &pb.meta) } diff --git a/vendor/github.com/prometheus/tsdb/block.prof b/vendor/github.com/prometheus/tsdb/block.prof new file mode 100644 index 0000000000..122ef97f33 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/block.prof @@ -0,0 +1,12 @@ +--- contention: +cycles/second=2494255279 +80179315716 1 @ 0x10061bb 0x10e008c 0x10e3934 0x10dfd30 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1 +80176248000 1 @ 0x1065c12 0x1313b9d 0x10dfd30 0x105cea1 +37792267436 303368 @ 0x10061fb 0x131dc08 0x105cea1 +21607828 1098 @ 0x10648fe 0x10650d7 0x1064fca 0x12e5a74 0x12e5df2 0x131d969 0x105cea1 +1272473 118 @ 0x10648fe 0x1065232 0x10651c6 0x1064cb0 0x12e5bcc 0x131dc50 0x105cea1 +851800 1 @ 0x10061bb 0x1313bc6 0x10dfd30 0x105cea1 +818628 59 @ 0x10648fe 0x1065232 0x10651c6 0x1064ebf 0x12e5a74 0x12e5df2 0x131d969 0x105cea1 +501203 2 @ 0x1005473 0x12e5ed4 0x131d969 0x105cea1 +7738 1 @ 0x10648fe 0x1064d19 0x12e5bcc 0x131dc50 0x105cea1 +3846 1 @ 0x1005473 0x10e373b 0x10dfd3a 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1 diff --git a/vendor/github.com/prometheus/tsdb/chunks.go b/vendor/github.com/prometheus/tsdb/chunks.go index f6e329b790..30ab93f805 100644 --- a/vendor/github.com/prometheus/tsdb/chunks.go +++ b/vendor/github.com/prometheus/tsdb/chunks.go @@ -298,7 +298,7 @@ type ChunkReader interface { // of series data. type chunkReader struct { // The underlying bytes holding the encoded series data. - bs [][]byte + bs []ByteSlice // Closers for resources behind the byte slices. cs []io.Closer @@ -306,8 +306,32 @@ type chunkReader struct { pool chunks.Pool } -// newChunkReader returns a new chunkReader based on mmaped files found in dir. -func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { +func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) { + cr := chunkReader{pool: pool, bs: bs, cs: cs} + + for i, b := range cr.bs { + if b.Len() < 4 { + return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) + } + // Verify magic number. + if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { + return nil, fmt.Errorf("invalid magic number %x", m) + } + } + return &cr, nil +} + +// NewChunkReader returns a new chunk reader against the given byte slices. +func NewChunkReader(bs []ByteSlice, pool chunks.Pool) (ChunkReader, error) { + if pool == nil { + pool = chunks.NewPool() + } + return newChunkReader(bs, nil, pool) +} + +// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the +// given directory. +func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) { files, err := sequenceFiles(dir) if err != nil { return nil, err @@ -315,27 +339,19 @@ func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { if pool == nil { pool = chunks.NewPool() } - cr := chunkReader{pool: pool} + + var bs []ByteSlice + var cs []io.Closer for _, fn := range files { f, err := openMmapFile(fn) if err != nil { return nil, errors.Wrapf(err, "mmap files") } - cr.cs = append(cr.cs, f) - cr.bs = append(cr.bs, f.b) + cs = append(cs, f) + bs = append(bs, realByteSlice(f.b)) } - - for i, b := range cr.bs { - if len(b) < 4 { - return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks { - return nil, fmt.Errorf("invalid magic number %x", m) - } - } - return &cr, nil + return newChunkReader(bs, cs, pool) } func (s *chunkReader) Close() error { @@ -352,16 +368,18 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { } b := s.bs[seq] - if int(off) >= len(b) { - return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) + if int(off) >= b.Len() { + return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len()) } - b = b[off:] + // With the minimum chunk length this should never cause us reading + // over the end of the slice. + r := b.Range(off, off+binary.MaxVarintLen32) - l, n := binary.Uvarint(b) + l, n := binary.Uvarint(r) if n < 0 { return nil, fmt.Errorf("reading chunk length failed") } - b = b[n:] + r = b.Range(off+n, off+n+int(l)) - return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l]) + return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l]) } diff --git a/vendor/github.com/prometheus/tsdb/chunks/xor.go b/vendor/github.com/prometheus/tsdb/chunks/xor.go index 501db704ad..dcee466f7f 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/xor.go +++ b/vendor/github.com/prometheus/tsdb/chunks/xor.go @@ -46,8 +46,7 @@ package chunks import ( "encoding/binary" "math" - - bits "github.com/dgryski/go-bits" + "math/bits" ) // XORChunk holds XOR encoded sample data. @@ -197,8 +196,8 @@ func (a *xorAppender) writeVDelta(v float64) { } a.b.writeBit(one) - leading := uint8(bits.Clz(vDelta)) - trailing := uint8(bits.Ctz(vDelta)) + leading := uint8(bits.LeadingZeros64(vDelta)) + trailing := uint8(bits.TrailingZeros64(vDelta)) // Clamp number of leading zeros to avoid overflow when encoding. if leading >= 32 { diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 955ba3cafb..35cb36a63c 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -52,7 +52,7 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. - Write(dest string, b BlockReader, mint, maxt int64) error + Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). @@ -321,7 +321,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { return c.write(dest, compactBlockMetas(uid, metas...), blocks...) } -func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error { +func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -333,7 +333,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) e meta.Compaction.Level = 1 meta.Compaction.Sources = []ulid.ULID{uid} - return c.write(dest, meta, b) + return uid, c.write(dest, meta, b) } // instrumentedChunkWriter is used for level 1 compactions to record statistics @@ -418,7 +418,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { + if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -453,7 +453,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // of the provided blocks. It returns meta information for the new block. func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { var ( - set compactionSet + set ChunkSeriesSet allSymbols = make(map[string]struct{}, 1<<16) closers = []io.Closer{} ) @@ -597,18 +597,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return nil } -type compactionSet interface { - Next() bool - At() (labels.Labels, []ChunkMeta, Intervals) - Err() error -} - type compactionSeriesSet struct { p Postings index IndexReader chunks ChunkReader tombstones TombstoneReader - series SeriesSet l labels.Labels c []ChunkMeta @@ -631,7 +624,11 @@ func (c *compactionSeriesSet) Next() bool { } var err error - c.intervals = c.tombstones.Get(c.p.At()) + c.intervals, err = c.tombstones.Get(c.p.At()) + if err != nil { + c.err = errors.Wrap(err, "get tombstones") + return false + } if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil { c.err = errors.Wrapf(err, "get series %d", c.p.At()) @@ -675,7 +672,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) { } type compactionMerger struct { - a, b compactionSet + a, b ChunkSeriesSet aok, bok bool l labels.Labels @@ -688,7 +685,7 @@ type compactionSeries struct { chunks []*ChunkMeta } -func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { +func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { c := &compactionMerger{ a: a, b: b, diff --git a/vendor/github.com/prometheus/tsdb/cpu.pro b/vendor/github.com/prometheus/tsdb/cpu.pro new file mode 100644 index 0000000000..e69de29bb2 diff --git a/vendor/github.com/prometheus/tsdb/cpu.prof b/vendor/github.com/prometheus/tsdb/cpu.prof new file mode 100644 index 0000000000..e69de29bb2 diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index 3622a77f5e..a748a6e1f5 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -52,7 +52,7 @@ var DefaultOptions = &Options{ // Options of the DB storage. type Options struct { - // The interval at which the write ahead log is flushed to disc. + // The interval at which the write ahead log is flushed to disk. WALFlushInterval time.Duration // Duration of persisted data to keep. @@ -101,7 +101,6 @@ type DB struct { opts *Options chunkPool chunks.Pool compactor Compactor - wal WAL // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex @@ -142,7 +141,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { }) m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_reloads_failures_total", - Help: "Number of times the database failed to reload black data from disk.", + Help: "Number of times the database failed to reload block data from disk.", }) m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_triggered_total", @@ -278,16 +277,23 @@ func (db *DB) retentionCutoff() (bool, error) { } db.mtx.RLock() - defer db.mtx.RUnlock() + blocks := db.blocks[:] + db.mtx.RUnlock() - if len(db.blocks) == 0 { + if len(blocks) == 0 { return false, nil } - last := db.blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) + last := blocks[len(db.blocks)-1] - return retentionCutoff(db.dir, mint) + mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) + dirs, err := retentionCutoffDirs(db.dir, mint) + if err != nil { + return false, err + } + + // This will close the dirs and then delete the dirs. + return len(dirs) > 0, db.reload(dirs...) } // Appender opens a new appender against the database. @@ -345,7 +351,7 @@ func (db *DB) compact() (changes bool, err error) { mint: mint, maxt: maxt, } - if err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { + if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true @@ -389,40 +395,37 @@ func (db *DB) compact() (changes bool, err error) { return changes, nil } -// retentionCutoff deletes all directories of blocks in dir that are strictly +// retentionCutoffDirs returns all directories of blocks in dir that are strictly // before mint. -func retentionCutoff(dir string, mint int64) (bool, error) { +func retentionCutoffDirs(dir string, mint int64) ([]string, error) { df, err := fileutil.OpenDir(dir) if err != nil { - return false, errors.Wrapf(err, "open directory") + return nil, errors.Wrapf(err, "open directory") } defer df.Close() dirs, err := blockDirs(dir) if err != nil { - return false, errors.Wrapf(err, "list block dirs %s", dir) + return nil, errors.Wrapf(err, "list block dirs %s", dir) } - changes := false + delDirs := []string{} for _, dir := range dirs { meta, err := readMetaFile(dir) if err != nil { - return changes, errors.Wrapf(err, "read block meta %s", dir) + return nil, errors.Wrapf(err, "read block meta %s", dir) } // The first block we encounter marks that we crossed the boundary // of deletable blocks. if meta.MaxTime >= mint { break } - changes = true - if err := os.RemoveAll(dir); err != nil { - return changes, err - } + delDirs = append(delDirs, dir) } - return changes, fileutil.Fsync(df) + return delDirs, nil } func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { @@ -572,6 +575,7 @@ func (db *DB) Close() error { if db.lockf != nil { merr.Add(db.lockf.Unlock()) } + merr.Add(db.head.Close()) return merr.Err() } @@ -615,7 +619,8 @@ func (db *DB) Snapshot(dir string) error { return errors.Wrap(err, "error snapshotting headblock") } } - return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + return errors.Wrap(err, "snapshot head block") } // Querier returns a new querier over the data partition for the given time range. diff --git a/vendor/github.com/prometheus/tsdb/db_unix.go b/vendor/github.com/prometheus/tsdb/db_unix.go index 09bb74f3ca..02c411d7f2 100644 --- a/vendor/github.com/prometheus/tsdb/db_unix.go +++ b/vendor/github.com/prometheus/tsdb/db_unix.go @@ -11,19 +11,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build !windows,!plan9,!solaris +// +build !windows,!plan9 package tsdb import ( "os" - "syscall" + + "golang.org/x/sys/unix" ) func mmap(f *os.File, length int) ([]byte, error) { - return syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED) + return unix.Mmap(int(f.Fd()), 0, length, unix.PROT_READ, unix.MAP_SHARED) } func munmap(b []byte) (err error) { - return syscall.Munmap(b) + return unix.Munmap(b) } diff --git a/vendor/github.com/prometheus/tsdb/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/encoding_helpers.go index 9aa4ba4097..b55c7fda96 100644 --- a/vendor/github.com/prometheus/tsdb/encoding_helpers.go +++ b/vendor/github.com/prometheus/tsdb/encoding_helpers.go @@ -3,6 +3,7 @@ package tsdb import ( "encoding/binary" "hash" + "hash/crc32" "unsafe" ) @@ -77,6 +78,11 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) } +// crc32 returns a CRC32 checksum over the remaining bytes. +func (d *decbuf) crc32() uint32 { + return crc32.Checksum(d.b, castagnoliTable) +} + func (d *decbuf) uvarintStr() string { l := d.uvarint64() if d.e != nil { diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 3d2bdb9c6f..d149cb1d9c 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -66,7 +66,7 @@ type Head struct { postings *memPostings // postings lists for terms - tombstones tombstoneReader + tombstones memTombstones } type headMetrics struct { @@ -186,7 +186,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: newUnorderedMemPostings(), - tombstones: newEmptyTombstoneReader(), + tombstones: memTombstones{}, } h.metrics = newHeadMetrics(h, r) @@ -574,8 +574,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := h.indexRange(mint, maxt) - pr := newPostingsReader(ir) - p, absent := pr.Select(ms...) + p, absent, err := PostingsForMatchers(ir, ms...) + if err != nil { + return errors.Wrap(err, "select series") + } var stones []Stone @@ -739,6 +741,11 @@ func (h *Head) MaxTime() int64 { return atomic.LoadInt64(&h.maxTime) } +// Close flushes the WAL and closes the head. +func (h *Head) Close() error { + return h.wal.Close() +} + type headChunkReader struct { head *Head mint, maxt int64 diff --git a/vendor/github.com/prometheus/tsdb/index.go b/vendor/github.com/prometheus/tsdb/index.go index 258db74d25..6895c16f42 100644 --- a/vendor/github.com/prometheus/tsdb/index.go +++ b/vendor/github.com/prometheus/tsdb/index.go @@ -560,7 +560,7 @@ type StringTuples interface { type indexReader struct { // The underlying byte slice holding the encoded series data. - b []byte + b ByteSlice toc indexTOC // Close that releases the underlying resources of the byte slice. @@ -575,33 +575,62 @@ type indexReader struct { // prevents memory faults when applications work with read symbols after // the block has been unmapped. symbols map[uint32]string + + crc32 hash.Hash32 } var ( - errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") + errInvalidSize = fmt.Errorf("invalid size") + errInvalidFlag = fmt.Errorf("invalid flag") + errInvalidChecksum = fmt.Errorf("invalid checksum") ) -// NewIndexReader returns a new IndexReader on the given directory. -func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) } +// ByteSlice abstracts a byte slice. +type ByteSlice interface { + Len() int + Range(start, end int) []byte +} -// newIndexReader returns a new indexReader on the given directory. -func newIndexReader(dir string) (*indexReader, error) { - f, err := openMmapFile(filepath.Join(dir, "index")) +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) ByteSlice { + return b[start:end] +} + +// NewIndexReader returns a new IndexReader on the given byte slice. +func NewIndexReader(b ByteSlice) (IndexReader, error) { + return newIndexReader(b, nil) +} + +// NewFileIndexReader returns a new index reader against the given index file. +func NewFileIndexReader(path string) (IndexReader, error) { + f, err := openMmapFile(path) if err != nil { return nil, err } - r := &indexReader{ - b: f.b, - c: f, - symbols: map[uint32]string{}, - } + return newIndexReader(realByteSlice(f.b), f) +} +func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { + r := &indexReader{ + b: b, + c: c, + symbols: map[uint32]string{}, + crc32: newCRC32(), + } // Verify magic number. - if len(f.b) < 4 { + if b.Len() < 4 { return nil, errors.Wrap(errInvalidSize, "index header") } - if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { + if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { return nil, errors.Errorf("invalid magic number %x", m) } @@ -611,6 +640,7 @@ func newIndexReader(dir string) (*indexReader, error) { if err := r.readSymbols(int(r.toc.symbols)); err != nil { return nil, errors.Wrap(err, "read symbols") } + var err error r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) if err != nil { @@ -621,7 +651,17 @@ func newIndexReader(dir string) (*indexReader, error) { } func (r *indexReader) readTOC() error { - d := r.decbufAt(len(r.b) - indexTOCLen) + if r.b.Len() < indexTOCLen { + return errInvalidSize + } + b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len()) + + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := decbuf{b: b[:len(b)-4]} + + if d.crc32() != expCRC { + return errInvalidChecksum + } r.toc.symbols = d.be64() r.toc.series = d.be64() @@ -630,16 +670,61 @@ func (r *indexReader) readTOC() error { r.toc.postings = d.be64() r.toc.postingsTable = d.be64() - // TODO(fabxc): validate checksum. - - return nil + return d.err() } +// decbufAt returns a new decoding buffer. It expects the first 4 bytes +// after offset to hold the big endian encoded content length, followed by the contents and the expected +// checksum. func (r *indexReader) decbufAt(off int) decbuf { - if len(r.b) < off { + if r.b.Len() < off+4 { return decbuf{e: errInvalidSize} } - return decbuf{b: r.b[off:]} + b := r.b.Range(off, off+4) + l := int(binary.BigEndian.Uint32(b)) + + if r.b.Len() < off+4+l+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = r.b.Range(off+4, off+4+l+4) + dec := decbuf{b: b[:len(b)-4]} + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { + return decbuf{e: errInvalidChecksum} + } + return dec +} + +// decbufUvarintAt returns a new decoding buffer. It expects the first bytes +// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected +// checksum. +func (r *indexReader) decbufUvarintAt(off int) decbuf { + // We never have to access this method at the far end of the byte slice. Thus just checking + // against the MaxVarintLen32 is sufficient. + if r.b.Len() < off+binary.MaxVarintLen32 { + return decbuf{e: errInvalidSize} + } + b := r.b.Range(off, off+binary.MaxVarintLen32) + + l, n := binary.Uvarint(b) + if n > binary.MaxVarintLen32 { + return decbuf{e: errors.New("invalid uvarint")} + } + + if r.b.Len() < off+n+int(l)+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = r.b.Range(off+n, off+n+int(l)+4) + dec := decbuf{b: b[:len(b)-4]} + + if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { + return decbuf{e: errInvalidChecksum} + } + return dec } // readSymbols reads the symbol table fully into memory and allocates proper strings for them. @@ -649,22 +734,22 @@ func (r *indexReader) readSymbols(off int) error { if off == 0 { return nil } + d := r.decbufAt(off) + var ( - d1 = r.decbufAt(int(off)) - d2 = d1.decbuf(d1.be32int()) - origLen = d2.len() - cnt = d2.be32int() + origLen = d.len() + cnt = d.be32int() basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d2.len()) + nextPos = basePos + uint32(origLen-d.len()) ) - for d2.err() == nil && d2.len() > 0 && cnt > 0 { - s := d2.uvarintStr() + for d.err() == nil && d.len() > 0 && cnt > 0 { + s := d.uvarintStr() r.symbols[uint32(nextPos)] = s - nextPos = basePos + uint32(origLen-d2.len()) + nextPos = basePos + uint32(origLen-d.len()) cnt-- } - return d2.err() + return d.err() } // readOffsetTable reads an offset table at the given position and returns a map @@ -672,53 +757,29 @@ func (r *indexReader) readSymbols(off int) error { func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { const sep = "\xff" - var ( - d1 = r.decbufAt(int(off)) - d2 = d1.decbuf(d1.be32int()) - cnt = d2.be32() - ) + d := r.decbufAt(int(off)) + cnt := d.be32() - res := make(map[string]uint32, 512) + res := make(map[string]uint32, cnt) - for d2.err() == nil && d2.len() > 0 && cnt > 0 { - keyCount := int(d2.uvarint()) + for d.err() == nil && d.len() > 0 && cnt > 0 { + keyCount := int(d.uvarint()) keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.uvarintStr()) + keys = append(keys, d.uvarintStr()) } - res[strings.Join(keys, sep)] = uint32(d2.uvarint()) + res[strings.Join(keys, sep)] = uint32(d.uvarint()) cnt-- } - - // TODO(fabxc): verify checksum from remainer of d1. - return res, d2.err() + return res, d.err() } func (r *indexReader) Close() error { return r.c.Close() } -func (r *indexReader) section(o uint32) (byte, []byte, error) { - b := r.b[o:] - - if len(b) < 5 { - return 0, nil, errors.Wrap(errInvalidSize, "read header") - } - - flag := b[0] - l := binary.BigEndian.Uint32(b[1:5]) - - b = b[5:] - - // b must have the given length plus 4 bytes for the CRC32 checksum. - if len(b) < int(l)+4 { - return 0, nil, errors.Wrap(errInvalidSize, "section content") - } - return flag, b[:l], nil -} - func (r *indexReader) lookupSymbol(o uint32) (string, error) { s, ok := r.symbols[o] if !ok { @@ -748,21 +809,17 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - d1 := r.decbufAt(int(off)) - d2 := d1.decbuf(d1.be32int()) + d := r.decbufAt(int(off)) - nc := d2.be32int() - d2.be32() // consume unused value entry count. + nc := d.be32int() + d.be32() // consume unused value entry count. - if d2.err() != nil { - return nil, errors.Wrap(d2.err(), "read label value index") + if d.err() != nil { + return nil, errors.Wrap(d.err(), "read label value index") } - - // TODO(fabxc): verify checksum in 4 remaining bytes of d1. - st := &serializedStringTuples{ l: nc, - b: d2.get(), + b: d.get(), lookup: r.lookupSymbol, } return st, nil @@ -785,20 +842,19 @@ func (r *indexReader) LabelIndices() ([][]string, error) { } func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { - d1 := r.decbufAt(int(ref)) - d2 := d1.decbuf(int(d1.uvarint())) + d := r.decbufUvarintAt(int(ref)) *lbls = (*lbls)[:0] *chks = (*chks)[:0] - k := int(d2.uvarint()) + k := int(d.uvarint()) for i := 0; i < k; i++ { - lno := uint32(d2.uvarint()) - lvo := uint32(d2.uvarint()) + lno := uint32(d.uvarint()) + lvo := uint32(d.uvarint()) - if d2.err() != nil { - return errors.Wrap(d2.err(), "read series label offsets") + if d.err() != nil { + return errors.Wrap(d.err(), "read series label offsets") } ln, err := r.lookupSymbol(lno) @@ -814,15 +870,15 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) } // Read the chunks meta data. - k = int(d2.uvarint()) + k = int(d.uvarint()) if k == 0 { return nil } - t0 := d2.varint64() - maxt := int64(d2.uvarint64()) + t0 - ref0 := int64(d2.uvarint64()) + t0 := d.varint64() + maxt := int64(d.uvarint64()) + t0 + ref0 := int64(d.uvarint64()) *chks = append(*chks, ChunkMeta{ Ref: uint64(ref0), @@ -832,14 +888,14 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) t0 = maxt for i := 1; i < k; i++ { - mint := int64(d2.uvarint64()) + t0 - maxt := int64(d2.uvarint64()) + mint + mint := int64(d.uvarint64()) + t0 + maxt := int64(d.uvarint64()) + mint - ref0 += d2.varint64() + ref0 += d.varint64() t0 = maxt - if d2.err() != nil { - return errors.Wrapf(d2.err(), "read meta for chunk %d", i) + if d.err() != nil { + return errors.Wrapf(d.err(), "read meta for chunk %d", i) } *chks = append(*chks, ChunkMeta{ @@ -848,10 +904,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) MaxTime: maxt, }) } - - // TODO(fabxc): verify CRC32. - - return nil + return d.err() } func (r *indexReader) Postings(name, value string) (Postings, error) { @@ -862,19 +915,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { if !ok { return emptyPostings, nil } + d := r.decbufAt(int(off)) + d.be32() // consume unused postings list length. - d1 := r.decbufAt(int(off)) - d2 := d1.decbuf(d1.be32int()) - - d2.be32() // consume unused postings list length. - - if d2.err() != nil { - return nil, errors.Wrap(d2.err(), "get postings bytes") - } - - // TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify. - - return newBigEndianPostings(d2.get()), nil + return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes") } func (r *indexReader) SortedPostings(p Postings) Postings { diff --git a/vendor/github.com/prometheus/tsdb/postings.go b/vendor/github.com/prometheus/tsdb/postings.go index 2647f4dd8e..63fb1e31a0 100644 --- a/vendor/github.com/prometheus/tsdb/postings.go +++ b/vendor/github.com/prometheus/tsdb/postings.go @@ -165,6 +165,11 @@ func (e errPostings) Err() error { return e.err } var emptyPostings = errPostings{} +// EmptyPostings returns a postings list that's always empty. +func EmptyPostings() Postings { + return emptyPostings +} + // Intersect returns a new postings list over the intersection of the // input postings. func Intersect(its ...Postings) Postings { diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index ed8b64ceac..37672c7156 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -27,7 +27,7 @@ import ( // time range. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(...labels.Matcher) SeriesSet + Select(...labels.Matcher) (SeriesSet, error) // LabelValues returns all potential values for a label name. LabelValues(string) ([]string, error) @@ -81,20 +81,29 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { return nil, fmt.Errorf("not implemented") } -func (q *querier) Select(ms ...labels.Matcher) SeriesSet { +func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) { return q.sel(q.blocks, ms) } -func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet { +func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) { if len(qs) == 0 { - return nopSeriesSet{} + return EmptySeriesSet(), nil } if len(qs) == 1 { return qs[0].Select(ms...) } l := len(qs) / 2 - return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms)) + + a, err := q.sel(qs[:l], ms) + if err != nil { + return nil, err + } + b, err := q.sel(qs[l:], ms) + if err != nil { + return nil, err + } + return newMergedSeriesSet(a, b), nil } func (q *querier) Close() error { @@ -141,20 +150,14 @@ type blockQuerier struct { mint, maxt int64 } -func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { - pr := newPostingsReader(q.index) - - p, absent := pr.Select(ms...) - +func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) { + base, err := LookupChunkSeries(q.index, q.tombstones, ms...) + if err != nil { + return nil, err + } return &blockSeriesSet{ set: &populatedChunkSeries{ - set: &baseChunkSeries{ - p: p, - index: q.index, - absent: absent, - - tombstones: q.tombstones, - }, + set: base, chunks: q.chunks, mint: q.mint, maxt: q.maxt, @@ -162,7 +165,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { mint: q.mint, maxt: q.maxt, - } + }, nil } func (q *blockQuerier) LabelValues(name string) ([]string, error) { @@ -196,16 +199,10 @@ func (q *blockQuerier) Close() error { return merr.Err() } -// postingsReader is used to select matching postings from an IndexReader. -type postingsReader struct { - index IndexReader -} - -func newPostingsReader(i IndexReader) *postingsReader { - return &postingsReader{index: i} -} - -func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { +// PostingsForMatchers assembles a single postings iterator against the index reader +// based on the given matchers. It returns a list of label names that must be manually +// checked to not exist in series the postings list points to. +func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) { var ( its []Postings absent []string @@ -217,12 +214,13 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { absent = append(absent, m.Name()) continue } - its = append(its, r.selectSingle(m)) + it, err := postingsForMatcher(index, m) + if err != nil { + return nil, nil, err + } + its = append(its, it) } - - p := Intersect(its...) - - return r.index.SortedPostings(p), absent + return index.SortedPostings(Intersect(its...)), absent, nil } // tuplesByPrefix uses binary search to find prefix matches within ts. @@ -256,33 +254,33 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) return matches, nil } -func (r *postingsReader) selectSingle(m labels.Matcher) Postings { +func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) { // Fast-path for equal matching. if em, ok := m.(*labels.EqualMatcher); ok { - it, err := r.index.Postings(em.Name(), em.Value()) + it, err := index.Postings(em.Name(), em.Value()) if err != nil { - return errPostings{err: err} + return nil, err } - return it + return it, nil } - tpls, err := r.index.LabelValues(m.Name()) + tpls, err := index.LabelValues(m.Name()) if err != nil { - return errPostings{err: err} + return nil, err } var res []string if pm, ok := m.(*labels.PrefixMatcher); ok { res, err = tuplesByPrefix(pm, tpls) if err != nil { - return errPostings{err: err} + return nil, err } } else { for i := 0; i < tpls.Len(); i++ { vals, err := tpls.At(i) if err != nil { - return errPostings{err: err} + return nil, err } if m.Matches(vals[0]) { res = append(res, vals[0]) @@ -291,20 +289,20 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings { } if len(res) == 0 { - return emptyPostings + return EmptyPostings(), nil } var rit []Postings for _, v := range res { - it, err := r.index.Postings(m.Name(), v) + it, err := index.Postings(m.Name(), v) if err != nil { - return errPostings{err: err} + return nil, err } rit = append(rit, it) } - return Merge(rit...) + return Merge(rit...), nil } func mergeStrings(a, b []string) []string { @@ -342,11 +340,12 @@ type SeriesSet interface { Err() error } -type nopSeriesSet struct{} +var emptySeriesSet = errSeriesSet{} -func (nopSeriesSet) Next() bool { return false } -func (nopSeriesSet) At() Series { return nil } -func (nopSeriesSet) Err() error { return nil } +// EmptySeriesSet returns a series set that's always empty. +func EmptySeriesSet() SeriesSet { + return emptySeriesSet +} // mergedSeriesSet takes two series sets as a single series set. The input series sets // must be sorted and sequential in time, i.e. if they have the same label set, @@ -418,7 +417,7 @@ func (s *mergedSeriesSet) Next() bool { return true } -type chunkSeriesSet interface { +type ChunkSeriesSet interface { Next() bool At() (labels.Labels, []ChunkMeta, Intervals) Err() error @@ -438,6 +437,24 @@ type baseChunkSeries struct { err error } +// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet +// over them. It drops chunks based on tombstones in the given reader. +func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { + if tr == nil { + tr = EmptyTombstoneReader() + } + p, absent, err := PostingsForMatchers(ir, ms...) + if err != nil { + return nil, err + } + return &baseChunkSeries{ + p: p, + index: ir, + tombstones: tr, + absent: absent, + }, nil +} + func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { return s.lset, s.chks, s.intervals } @@ -448,6 +465,7 @@ func (s *baseChunkSeries) Next() bool { var ( lset labels.Labels chunks []ChunkMeta + err error ) Outer: for s.p.Next() { @@ -470,7 +488,11 @@ Outer: s.lset = lset s.chks = chunks - s.intervals = s.tombstones.Get(s.p.At()) + s.intervals, err = s.tombstones.Get(s.p.At()) + if err != nil { + s.err = errors.Wrap(err, "get tombstones") + return false + } if len(s.intervals) > 0 { // Only those chunks that are not entirely deleted. @@ -496,7 +518,7 @@ Outer: // with known chunk references. It filters out chunks that do not fit the // given time range. type populatedChunkSeries struct { - set chunkSeriesSet + set ChunkSeriesSet chunks ChunkReader mint, maxt int64 @@ -553,7 +575,7 @@ func (s *populatedChunkSeries) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { - set chunkSeriesSet + set ChunkSeriesSet err error cur Series diff --git a/vendor/github.com/prometheus/tsdb/tombstones.go b/vendor/github.com/prometheus/tsdb/tombstones.go index d43cd0bd0c..8ca089e617 100644 --- a/vendor/github.com/prometheus/tsdb/tombstones.go +++ b/vendor/github.com/prometheus/tsdb/tombstones.go @@ -35,12 +35,17 @@ const ( // TombstoneReader gives access to tombstone intervals by series reference. type TombstoneReader interface { - Get(ref uint64) Intervals + // Get returns deletion intervals for the series with the given reference. + Get(ref uint64) (Intervals, error) + // Iter calls the given function for each encountered interval. + Iter(func(uint64, Intervals) error) error + + // Close any underlying resources Close() error } -func writeTombstoneFile(dir string, tr tombstoneReader) error { +func writeTombstoneFile(dir string, tr TombstoneReader) error { path := filepath.Join(dir, tombstoneFilename) tmp := path + ".tmp" hash := newCRC32() @@ -67,19 +72,21 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { mw := io.MultiWriter(f, hash) - for k, v := range tr { - for _, itv := range v { + tr.Iter(func(ref uint64, ivs Intervals) error { + for _, iv := range ivs { buf.reset() - buf.putUvarint64(k) - buf.putVarint64(itv.Mint) - buf.putVarint64(itv.Maxt) + + buf.putUvarint64(ref) + buf.putVarint64(iv.Mint) + buf.putVarint64(iv.Maxt) _, err = mw.Write(buf.get()) if err != nil { return err } } - } + return nil + }) _, err = f.Write(hash.Sum(nil)) if err != nil { @@ -100,7 +107,7 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (tombstoneReader, error) { +func readTombstones(dir string) (memTombstones, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if err != nil { return nil, err @@ -131,7 +138,8 @@ func readTombstones(dir string) (tombstoneReader, error) { return nil, errors.New("checksum did not match") } - stonesMap := newEmptyTombstoneReader() + stonesMap := memTombstones{} + for d.len() > 0 { k := d.uvarint64() mint := d.varint64() @@ -143,28 +151,36 @@ func readTombstones(dir string) (tombstoneReader, error) { stonesMap.add(k, Interval{mint, maxt}) } - return newTombstoneReader(stonesMap), nil + return stonesMap, nil } -type tombstoneReader map[uint64]Intervals +type memTombstones map[uint64]Intervals -func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader { - return tombstoneReader(ts) +var emptyTombstoneReader = memTombstones{} + +// EmptyTombstoneReader returns a TombstoneReader that is always empty. +func EmptyTombstoneReader() TombstoneReader { + return emptyTombstoneReader } -func newEmptyTombstoneReader() tombstoneReader { - return tombstoneReader(make(map[uint64]Intervals)) +func (t memTombstones) Get(ref uint64) (Intervals, error) { + return t[ref], nil } -func (t tombstoneReader) Get(ref uint64) Intervals { - return t[ref] +func (t memTombstones) Iter(f func(uint64, Intervals) error) error { + for ref, ivs := range t { + if err := f(ref, ivs); err != nil { + return err + } + } + return nil } -func (t tombstoneReader) add(ref uint64, itv Interval) { +func (t memTombstones) add(ref uint64, itv Interval) { t[ref] = t[ref].add(itv) } -func (tombstoneReader) Close() error { +func (memTombstones) Close() error { return nil } diff --git a/vendor/github.com/prometheus/tsdb/tsdb.test b/vendor/github.com/prometheus/tsdb/tsdb.test new file mode 100755 index 0000000000..0cd858d367 Binary files /dev/null and b/vendor/github.com/prometheus/tsdb/tsdb.test differ diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index b0cea8f09d..72e8bc070b 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -63,11 +63,11 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { m := &walMetrics{} m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tsdb_wal_fsync_duration_seconds", + Name: "prometheus_tsdb_wal_fsync_duration_seconds", Help: "Duration of WAL fsync.", }) m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_wal_corruptions_total", + Name: "prometheus_tsdb_wal_corruptions_total", Help: "Total number of WAL corruptions.", }) diff --git a/vendor/vendor.json b/vendor/vendor.json index f3ec812e35..8e61186058 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -782,28 +782,28 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "Bty/r75M8kM+GA80eMM5p0cLTi0=", + "checksumSHA1": "c3VEi8SL0XmI6BmokMOWrSWmNu8=", "path": "github.com/prometheus/tsdb", - "revision": "706602daed1487f7849990678b4ece4599745905", - "revisionTime": "2017-11-04T07:45:56Z" + "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", + "revisionTime": "2017-11-23T17:41:24Z" }, { - "checksumSHA1": "uy6ySJ6EZqof+yMD2wTkYob8BeU=", + "checksumSHA1": "C5V8KPHm/gZF0qrNwmIEDdG6rhA=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "706602daed1487f7849990678b4ece4599745905", - "revisionTime": "2017-11-04T07:45:56Z" + "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", + "revisionTime": "2017-11-23T17:41:24Z" }, { "checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=", "path": "github.com/prometheus/tsdb/fileutil", - "revision": "706602daed1487f7849990678b4ece4599745905", - "revisionTime": "2017-11-04T07:45:56Z" + "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", + "revisionTime": "2017-11-23T17:41:24Z" }, { "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "path": "github.com/prometheus/tsdb/labels", - "revision": "706602daed1487f7849990678b4ece4599745905", - "revisionTime": "2017-11-04T07:45:56Z" + "revision": "ad3c4849a99729a9c10a55b3ba4c0ad146d2446a", + "revisionTime": "2017-11-23T17:41:24Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 825e8db729..238ba8ff6e 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -380,7 +380,11 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { var set storage.SeriesSet for _, mset := range matcherSets { - set = storage.DeduplicateSeriesSet(set, q.Select(mset...)) + s, err := q.Select(mset...) + if err != nil { + return nil, &apiError{errorExec, err} + } + set = storage.DeduplicateSeriesSet(set, s) } metrics := []labels.Labels{} @@ -517,7 +521,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { } } - resp.Results[i], err = remote.ToQueryResult(querier.Select(filteredMatchers...)) + set, err := querier.Select(filteredMatchers...) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + resp.Results[i], err = remote.ToQueryResult(set) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/web/federate.go b/web/federate.go index 7597f6ddbf..43028a3ac8 100644 --- a/web/federate.go +++ b/web/federate.go @@ -75,7 +75,13 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { var set storage.SeriesSet for _, mset := range matcherSets { - set = storage.DeduplicateSeriesSet(set, q.Select(mset...)) + s, err := q.Select(mset...) + if err != nil { + federationErrors.Inc() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + set = storage.DeduplicateSeriesSet(set, s) } if set == nil { return