diff --git a/tsdb/block.go b/tsdb/block.go index fa8e896e3..536940ec9 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -111,7 +111,8 @@ type ChunkReader interface { // BlockReader provides reading access to a data block. type BlockReader interface { - // Index returns an IndexReader over the block's data. + // Index returns an IndexReader over the block's data within the specified + // timeframe. Index(mint, maxt int64) (IndexReader, error) // Chunks returns a ChunkReader over the block's data. diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index c55eb2985..1febb4004 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -470,7 +470,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error { // Presume 1ms resolution that Prometheus uses. fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String()) fmt.Printf("Series: %d\n", meta.Stats.NumSeries) - ir, err := b.Index() + ir, err := b.Index(math.MinInt64, math.MaxInt64) if err != nil { return err } diff --git a/tsdb/compact.go b/tsdb/compact.go index f0cf28d9b..61fe549f9 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -683,7 +683,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - indexr, err := b.Index(math.MinInt64, math.MaxInt64) + indexr, err := b.Index(math.MinInt64, globalMaxt) if err != nil { return errors.Wrapf(err, "open index reader for block %s", b) } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 31734a574..a0a9d7a38 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -456,10 +456,10 @@ func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta { type erringBReader struct{} -func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } -func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } -func (erringBReader) Tombstones() (tombstones.Reader, error) { return nil, errors.New("tombstones") } -func (erringBReader) Meta() BlockMeta { return BlockMeta{} } +func (erringBReader) Index(int64, int64) (IndexReader, error) { return nil, errors.New("index") } +func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } +func (erringBReader) Tombstones() (tombstones.Reader, error) { return nil, errors.New("tombstones") } +func (erringBReader) Meta() BlockMeta { return BlockMeta{} } type nopChunkWriter struct{} @@ -652,7 +652,7 @@ func TestCompaction_populateBlock(t *testing.T) { expErr: errors.New("found chunk with minTime: 10 maxTime: 30 outside of compacted minTime: 0 maxTime: 20"), }, { - // Introduced by https://github.com/prometheus/prometheus/tsdb/issues/347. + // Introduced by https://github.com/prometheus/tsdb/issues/347. title: "Populate from single block containing extra chunk", inputSeriesSamples: [][]seriesSamples{ { @@ -692,7 +692,7 @@ func TestCompaction_populateBlock(t *testing.T) { }, }, { - // Introduced by https://github.com/prometheus/prometheus/tsdb/pull/539. + // Introduced by https://github.com/prometheus/tsdb/pull/539. title: "Populate from three blocks that the last two are overlapping.", inputSeriesSamples: [][]seriesSamples{ { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 4eabef608..29131c52b 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1384,7 +1384,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { }, OverlappingBlocks(nc1)) } -// Regression test for https://github.com/prometheus/prometheus/tsdb/issues/347 +// Regression test for https://github.com/prometheus/tsdb/issues/347 func TestChunkAtBlockBoundary(t *testing.T) { db, closeFn := openTestDB(t, nil, nil) defer func() { @@ -1411,7 +1411,7 @@ func TestChunkAtBlockBoundary(t *testing.T) { testutil.Ok(t, err) for _, block := range db.Blocks() { - r, err := block.Index() + r, err := block.Index(math.MinInt64, math.MaxInt64) testutil.Ok(t, err) defer r.Close() @@ -1761,7 +1761,7 @@ func TestDB_LabelNames(t *testing.T) { appendSamples(db, 0, 4, tst.sampleLabels1) // Testing head. - headIndexr, err := db.head.Index() + headIndexr, err := db.head.Index(math.MinInt64, math.MaxInt64) testutil.Ok(t, err) labelNames, err := headIndexr.LabelNames() testutil.Ok(t, err) @@ -1774,7 +1774,7 @@ func TestDB_LabelNames(t *testing.T) { // All blocks have same label names, hence check them individually. // No need to aggregate and check. for _, b := range db.Blocks() { - blockIndexr, err := b.Index() + blockIndexr, err := b.Index(math.MinInt64, math.MaxInt64) testutil.Ok(t, err) labelNames, err = blockIndexr.LabelNames() testutil.Ok(t, err) diff --git a/tsdb/head.go b/tsdb/head.go index a727eef99..726e70690 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -754,7 +754,7 @@ type RangeHead struct { mint, maxt int64 } -// NewRangeHead returns a *rangeHead. +// NewRangeHead returns a *RangeHead. func NewRangeHead(head *Head, mint, maxt int64) *RangeHead { return &RangeHead{ head: head, @@ -764,7 +764,14 @@ func NewRangeHead(head *Head, mint, maxt int64) *RangeHead { } func (h *RangeHead) Index(mint, maxt int64) (IndexReader, error) { - return h.head.indexRange(h.mint, h.maxt), nil + // rangeHead guarantees that the series returned are within its range. + if mint < h.mint { + mint = h.mint + } + if maxt > h.maxt { + maxt = h.maxt + } + return h.head.indexRange(mint, maxt), nil } func (h *RangeHead) Chunks() (ChunkReader, error) { @@ -1347,9 +1354,17 @@ func (h *headIndexReader) LabelNames() ([]string, error) { // Postings returns the postings list iterator for the label pairs. func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { + fullRange := h.mint <= h.head.MinTime() && h.maxt >= h.head.MaxTime() res := make([]index.Postings, 0, len(values)) for _, value := range values { p := h.head.postings.Get(name, value) + if fullRange { + // The head timerange covers the full index reader timerange. + // All the series can the be appended without filtering. + res = append(res, p) + continue + } + // Filter out series not in the time range, to avoid // later on building up all the chunk metadata just to // discard it. diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index 954541395..c1ada3417 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -14,7 +14,6 @@ package tsdb import ( - "fmt" "strconv" "sync/atomic" "testing" @@ -49,35 +48,3 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { } }) } - -func BenchmarkHeadSeries(b *testing.B) { - h, err := NewHead(nil, nil, nil, 1000) - testutil.Ok(b, err) - defer h.Close() - app := h.Appender() - numSeries := 1000000 - for i := 0; i < numSeries; i++ { - app.Add(labels.FromStrings("foo", "bar", "i", strconv.Itoa(i)), int64(i), 0) - } - testutil.Ok(b, app.Commit()) - - matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") - - for s := 1; s <= numSeries; s *= 10 { - b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) { - q, err := NewBlockQuerier(h, 0, int64(s-1)) - testutil.Ok(b, err) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - ss, err := q.Select(matcher) - testutil.Ok(b, err) - for ss.Next() { - } - testutil.Ok(b, ss.Err()) - } - q.Close() - }) - - } -} diff --git a/tsdb/head_test.go b/tsdb/head_test.go index cba30ef38..f6b0d088d 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1322,3 +1322,91 @@ func TestAddDuplicateLabelName(t *testing.T) { add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "c"}}, "a") add(labels.Labels{{Name: "__name__", Value: "up"}, {Name: "job", Value: "prometheus"}, {Name: "le", Value: "500"}, {Name: "le", Value: "400"}, {Name: "unit", Value: "s"}}, "le") } + +func TestHeadSeriesWithTimeBoundaries(t *testing.T) { + h, err := NewHead(nil, nil, nil, 15, DefaultStripeSize) + testutil.Ok(t, err) + defer h.Close() + app := h.Appender() + + s1, err := app.Add(labels.FromStrings("foo1", "bar"), 2, 0) + testutil.Ok(t, err) + for ts := int64(3); ts < 13; ts++ { + err = app.AddFast(s1, ts, 0) + testutil.Ok(t, err) + } + s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0) + testutil.Ok(t, err) + for ts := int64(6); ts < 11; ts++ { + err = app.AddFast(s2, ts, 0) + testutil.Ok(t, err) + } + s3, err := app.Add(labels.FromStrings("foo3", "bar"), 5, 0) + testutil.Ok(t, err) + err = app.AddFast(s3, 6, 0) + testutil.Ok(t, err) + _, err = app.Add(labels.FromStrings("foo4", "bar"), 9, 0) + testutil.Ok(t, err) + + testutil.Ok(t, app.Commit()) + + cases := []struct { + mint int64 + maxt int64 + seriesCount int + samplesCount int + }{ + // foo1 ..00000000000.. + // foo2 .....000000.... + // foo3 .....00........ + // foo4 .........0..... + {mint: 0, maxt: 0, seriesCount: 0, samplesCount: 0}, + {mint: 0, maxt: 1, seriesCount: 0, samplesCount: 0}, + {mint: 0, maxt: 2, seriesCount: 1, samplesCount: 1}, + {mint: 2, maxt: 2, seriesCount: 1, samplesCount: 1}, + {mint: 0, maxt: 4, seriesCount: 1, samplesCount: 3}, + {mint: 0, maxt: 5, seriesCount: 3, samplesCount: 6}, + {mint: 0, maxt: 6, seriesCount: 3, samplesCount: 9}, + {mint: 0, maxt: 7, seriesCount: 3, samplesCount: 11}, + {mint: 0, maxt: 8, seriesCount: 3, samplesCount: 13}, + {mint: 0, maxt: 9, seriesCount: 4, samplesCount: 16}, + {mint: 0, maxt: 10, seriesCount: 4, samplesCount: 18}, + {mint: 0, maxt: 11, seriesCount: 4, samplesCount: 19}, + {mint: 0, maxt: 12, seriesCount: 4, samplesCount: 20}, + {mint: 0, maxt: 13, seriesCount: 4, samplesCount: 20}, + {mint: 0, maxt: 14, seriesCount: 4, samplesCount: 20}, + {mint: 2, maxt: 14, seriesCount: 4, samplesCount: 20}, + {mint: 3, maxt: 14, seriesCount: 4, samplesCount: 19}, + {mint: 4, maxt: 14, seriesCount: 4, samplesCount: 18}, + {mint: 8, maxt: 9, seriesCount: 3, samplesCount: 5}, + {mint: 9, maxt: 9, seriesCount: 3, samplesCount: 3}, + {mint: 6, maxt: 9, seriesCount: 4, samplesCount: 10}, + {mint: 11, maxt: 11, seriesCount: 1, samplesCount: 1}, + {mint: 11, maxt: 12, seriesCount: 1, samplesCount: 2}, + {mint: 11, maxt: 14, seriesCount: 1, samplesCount: 2}, + {mint: 12, maxt: 14, seriesCount: 1, samplesCount: 1}, + } + + for i, c := range cases { + matcher := labels.MustNewMatcher(labels.MatchEqual, "", "") + q, err := NewBlockQuerier(h, c.mint, c.maxt) + testutil.Ok(t, err) + + seriesCount := 0 + samplesCount := 0 + ss, _, err := q.Select(nil, matcher) + testutil.Ok(t, err) + for ss.Next() { + i := ss.At().Iterator() + for i.Next() { + samplesCount++ + } + seriesCount++ + } + testutil.Ok(t, ss.Err()) + testutil.Equals(t, c.seriesCount, seriesCount, "test series %d", i) + testutil.Equals(t, c.samplesCount, samplesCount, "test samples %d", i) + q.Close() + } + +} diff --git a/tsdb/mocks_test.go b/tsdb/mocks_test.go index 913061e83..a4e47b8cb 100644 --- a/tsdb/mocks_test.go +++ b/tsdb/mocks_test.go @@ -71,8 +71,8 @@ type mockBReader struct { maxt int64 } -func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil } -func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } +func (r *mockBReader) Index(mint, maxt int64) (IndexReader, error) { return r.ir, nil } +func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } func (r *mockBReader) Tombstones() (tombstones.Reader, error) { return tombstones.NewMemTombstones(), nil } diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 2cad9d9e1..765435203 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -16,6 +16,7 @@ package tsdb import ( "fmt" "io/ioutil" + "math" "os" "strconv" "testing" @@ -54,7 +55,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) { } testutil.Ok(b, app.Commit()) - ir, err := h.Index() + ir, err := h.Index(math.MinInt64, math.MaxInt64) testutil.Ok(b, err) b.Run("Head", func(b *testing.B) { benchmarkPostingsForMatchers(b, ir) @@ -72,7 +73,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) { defer func() { testutil.Ok(b, block.Close()) }() - ir, err = block.Index() + ir, err = block.Index(math.MinInt64, math.MaxInt64) testutil.Ok(b, err) defer ir.Close() b.Run("Block", func(b *testing.B) { diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 25bf3d062..778081737 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1044,7 +1044,7 @@ func TestSeriesIterator(t *testing.T) { }) } -// Regression for: https://github.com/prometheus/prometheus/tsdb/pull/97 +// Regression for: https://github.com/prometheus/tsdb/pull/97 func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { chkMetas := []chunks.Meta{ tsdbutil.ChunkFromSamples([]tsdbutil.Sample{}), @@ -2137,7 +2137,7 @@ func TestPostingsForMatchers(t *testing.T) { }, } - ir, err := h.Index() + ir, err := h.Index(math.MinInt64, math.MaxInt64) testutil.Ok(t, err) for _, c := range cases { diff --git a/tsdb/wal/wal_test.go b/tsdb/wal/wal_test.go index cd7a9fd8f..340c2b32e 100644 --- a/tsdb/wal/wal_test.go +++ b/tsdb/wal/wal_test.go @@ -49,7 +49,7 @@ func TestWALRepair_ReadingError(t *testing.T) { }, // Ensures that the page buffer is big enough to fit // an entire page size without panicking. - // https://github.com/prometheus/prometheus/tsdb/pull/414 + // https://github.com/prometheus/tsdb/pull/414 "bad_header": { 1, func(f *os.File) {