diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a05606970..f42a92474f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. - [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block. + - [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes` + ## 0.4.0 - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) diff --git a/block_test.go b/block_test.go index addb5ea972..8ba6f3ec5a 100644 --- a/block_test.go +++ b/block_test.go @@ -22,8 +22,8 @@ import ( "testing" "github.com/go-kit/kit/log" - "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/tsdbutil" ) // In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped @@ -46,7 +46,7 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(tmpdir) - blockDir := createBlock(t, tmpdir, 1, 0, 0) + blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0)) b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) @@ -60,33 +60,32 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, b.Close()) } -// createBlock creates a block with nSeries series, filled with -// samples of the given mint,maxt time range and returns its dir. -func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) string { +// createBlock creates a block with given set of series and returns its dir. +func createBlock(tb testing.TB, dir string, series []Series) string { head, err := NewHead(nil, nil, nil, 2*60*60*1000) testutil.Ok(tb, err) defer head.Close() - lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), nSeries) - testutil.Ok(tb, err) - refs := make([]uint64, nSeries) - - for ts := mint; ts <= maxt; ts++ { - app := head.Appender() - for i, lbl := range lbls { - if refs[i] != 0 { - err := app.AddFast(refs[i], ts, rand.Float64()) + app := head.Appender() + for _, s := range series { + ref := uint64(0) + it := s.Iterator() + for it.Next() { + t, v := it.At() + if ref != 0 { + err := app.AddFast(ref, t, v) if err == nil { continue } } - ref, err := app.Add(lbl, int64(ts), rand.Float64()) + ref, err = app.Add(s.Labels(), t, v) testutil.Ok(tb, err) refs[i] = ref } - err := app.Commit() - testutil.Ok(tb, err) + testutil.Ok(tb, it.Err()) } + err = app.Commit() + testutil.Ok(tb, err) compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) testutil.Ok(tb, err) @@ -97,3 +96,53 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin testutil.Ok(tb, err) return filepath.Join(dir, ulid.String()) } + +// genSeries generates series with a given number of labels and values. +func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series { + if totalSeries == 0 || labelCount == 0 { + return nil + } + series := make([]Series, totalSeries) + + for i := 0; i < totalSeries; i++ { + lbls := make(map[string]string, labelCount) + for len(lbls) < labelCount { + lbls[randString()] = randString() + } + samples := make([]tsdbutil.Sample, 0, maxt-mint+1) + for t := mint; t <= maxt; t++ { + samples = append(samples, sample{t: t, v: rand.Float64()}) + } + series[i] = newSeries(lbls, samples) + } + + return series +} + +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +const ( + letterIdxBits = 6 // 6 bits to represent a letter index + letterIdxMask = 1<= 0; { + if remain == 0 { + cache, remain = rand.Int63(), letterIdxMax + } + if idx := int(cache & letterIdxMask); idx < len(letterBytes) { + b[i] = letterBytes[idx] + i-- + } + cache >>= letterIdxBits + remain-- + } + + return string(b) +} diff --git a/chunks/chunks.go b/chunks/chunks.go index 8fb288384e..fe3e982e8d 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -352,30 +352,31 @@ func (s *Reader) Size() int64 { return s.size } +// Chunk returns a chunk from a given reference. func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( - seq = int(ref >> 32) - off = int((ref << 32) >> 32) + sgmSeq = int(ref >> 32) + sgmOffset = int((ref << 32) >> 32) ) - if seq >= len(s.bs) { - return nil, errors.Errorf("reference sequence %d out of range", seq) + if sgmSeq >= len(s.bs) { + return nil, errors.Errorf("reference sequence %d out of range", sgmSeq) } - b := s.bs[seq] + chkS := s.bs[sgmSeq] - if off >= b.Len() { - return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len()) + if sgmOffset >= chkS.Len() { + return nil, errors.Errorf("offset %d beyond data size %d", sgmOffset, chkS.Len()) } // With the minimum chunk length this should never cause us reading // over the end of the slice. - r := b.Range(off, off+binary.MaxVarintLen32) + chk := chkS.Range(sgmOffset, sgmOffset+binary.MaxVarintLen32) - l, n := binary.Uvarint(r) + chkLen, n := binary.Uvarint(chk) if n <= 0 { return nil, errors.Errorf("reading chunk length failed with %d", n) } - r = b.Range(off+n, off+n+int(l)) + chk = chkS.Range(sgmOffset+n, sgmOffset+n+1+int(chkLen)) - return s.pool.Get(chunkenc.Encoding(r[0]), r[1:1+l]) + return s.pool.Get(chunkenc.Encoding(chk[0]), chk[1:1+chkLen]) } func nextSequenceFile(dir string) (string, int, error) { diff --git a/compact.go b/compact.go index ab0d9aab65..bb5d53e2e8 100644 --- a/compact.go +++ b/compact.go @@ -424,6 +424,8 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u } func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { + start := time.Now() + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -450,7 +452,13 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p return ulid.ULID{}, nil } - level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) + level.Info(c.logger).Log( + "msg", "write block", + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "duration", time.Since(start), + ) return uid, nil } diff --git a/db.go b/db.go index bfa52840cc..9436809b73 100644 --- a/db.go +++ b/db.go @@ -206,7 +206,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Help: "The time taken to recompact blocks to remove tombstones.", }) m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_storage_blocks_bytes_total", + Name: "prometheus_tsdb_storage_blocks_bytes", Help: "The number of bytes that are currently used for local storage by all blocks.", }) m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ diff --git a/db_test.go b/db_test.go index 21de405e25..f727a79192 100644 --- a/db_test.go +++ b/db_test.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/tsdbutil" "github.com/prometheus/tsdb/wal" ) @@ -87,7 +88,7 @@ func TestDB_reloadOrder(t *testing.T) { {MinTime: 100, MaxTime: 110}, } for _, m := range metas { - createBlock(t, db.Dir(), 1, m.MinTime, m.MaxTime) + createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) } testutil.Ok(t, db.reload()) @@ -250,7 +251,7 @@ Outer: res, err := q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) - expSamples := make([]Sample, 0, len(c.remaint)) + expSamples := make([]tsdbutil.Sample, 0, len(c.remaint)) for _, ts := range c.remaint { expSamples = append(expSamples, sample{ts, smpls[ts]}) } @@ -477,7 +478,7 @@ Outer: res, err := q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) - expSamples := make([]Sample, 0, len(c.remaint)) + expSamples := make([]tsdbutil.Sample, 0, len(c.remaint)) for _, ts := range c.remaint { expSamples = append(expSamples, sample{ts, smpls[ts]}) } @@ -782,7 +783,7 @@ func TestTombstoneClean(t *testing.T) { res, err := q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) - expSamples := make([]Sample, 0, len(c.remaint)) + expSamples := make([]tsdbutil.Sample, 0, len(c.remaint)) for _, ts := range c.remaint { expSamples = append(expSamples, sample{ts, smpls[ts]}) } @@ -836,7 +837,7 @@ func TestTombstoneCleanFail(t *testing.T) { // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. totalBlocks := 2 for i := 0; i < totalBlocks; i++ { - blockDir := createBlock(t, db.Dir(), 1, 0, 0) + blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 0)) block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. @@ -880,7 +881,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 0)), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -918,7 +919,7 @@ func TestTimeRetention(t *testing.T) { } for _, m := range blocks { - createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) + createBlock(t, db.Dir(), genSeries(10, 10, m.MinTime, m.MaxTime)) } testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. @@ -952,7 +953,7 @@ func TestSizeRetention(t *testing.T) { } for _, m := range blocks { - createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) + createBlock(t, db.Dir(), genSeries(100, 10, m.MinTime, m.MaxTime)) } // Test that registered size matches the actual disk size. @@ -1319,7 +1320,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - createBlock(t, dir, 1, 1000, 2000) + createBlock(t, dir, genSeries(1, 1, 1000, 2000)) db, err := Open(dir, nil, nil, nil) testutil.Ok(t, err) @@ -1332,7 +1333,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - createBlock(t, dir, 1, 1000, 6000) + createBlock(t, dir, genSeries(1, 1, 1000, 6000)) testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) w, err := wal.New(nil, nil, path.Join(dir, "wal")) @@ -1450,7 +1451,7 @@ func TestNoEmptyBlocks(t *testing.T) { {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, } for _, m := range blocks { - createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime) + createBlock(t, db.Dir(), genSeries(2, 2, m.MinTime, m.MaxTime)) } oldBlocks := db.Blocks() @@ -1603,9 +1604,9 @@ func TestCorrectNumTombstones(t *testing.T) { // TestBlockRanges checks the following use cases: // - No samples can be added with timestamps lower than the last block maxt. -// - The compactor doesn't create overlaping blocks +// - The compactor doesn't create overlapping blocks // even when the last blocks is not within the default boundaries. -// - Lower bondary is based on the smallest sample in the head and +// - Lower boundary is based on the smallest sample in the head and // upper boundary is rounded to the configured block range. // // This ensures that a snapshot that includes the head and creates a block with a custom time range @@ -1623,7 +1624,7 @@ func TestBlockRanges(t *testing.T) { // Test that the compactor doesn't create overlapping blocks // when a non standard block already exists. firstBlockMaxT := int64(3) - createBlock(t, dir, 1, 0, firstBlockMaxT) + createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT)) db, err := Open(dir, logger, nil, DefaultOptions) if err != nil { t.Fatalf("Opening test storage failed: %s", err) @@ -1673,7 +1674,7 @@ func TestBlockRanges(t *testing.T) { testutil.Ok(t, db.Close()) thirdBlockMaxt := secondBlockMaxt + 2 - createBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt) + createBlock(t, dir, genSeries(1, 1, secondBlockMaxt+1, thirdBlockMaxt)) db, err = Open(dir, logger, nil, DefaultOptions) if err != nil { diff --git a/head_test.go b/head_test.go index 8781f677af..137818323d 100644 --- a/head_test.go +++ b/head_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/tsdbutil" "github.com/prometheus/tsdb/wal" ) @@ -352,7 +353,7 @@ Outer: res, err := q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) - expSamples := make([]Sample, 0, len(c.remaint)) + expSamples := make([]tsdbutil.Sample, 0, len(c.remaint)) for _, ts := range c.remaint { expSamples = append(expSamples, sample{ts, smpls[ts]}) } @@ -472,9 +473,9 @@ func TestDelete_e2e(t *testing.T) { {"job", "prom-k8s"}, }, } - seriesMap := map[string][]Sample{} + seriesMap := map[string][]tsdbutil.Sample{} for _, l := range lbls { - seriesMap[labels.New(l...).String()] = []Sample{} + seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} } dir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(dir) @@ -484,7 +485,7 @@ func TestDelete_e2e(t *testing.T) { app := hb.Appender() for _, l := range lbls { ls := labels.New(l...) - series := []Sample{} + series := []tsdbutil.Sample{} ts := rand.Int63n(300) for i := 0; i < numDatapoints; i++ { v := rand.Float64() @@ -604,8 +605,8 @@ func boundedSamples(full []sample, mint, maxt int64) []sample { return full } -func deletedSamples(full []Sample, dranges Intervals) []Sample { - ds := make([]Sample, 0, len(full)) +func deletedSamples(full []tsdbutil.Sample, dranges Intervals) []tsdbutil.Sample { + ds := make([]tsdbutil.Sample, 0, len(full)) Outer: for _, s := range full { for _, r := range dranges { diff --git a/querier_test.go b/querier_test.go index 69ca84602b..63bfda00d7 100644 --- a/querier_test.go +++ b/querier_test.go @@ -68,58 +68,6 @@ func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } func (m *mockSeriesIterator) Next() bool { return m.next() } func (m *mockSeriesIterator) Err() error { return m.err() } -type mockSeries struct { - labels func() labels.Labels - iterator func() SeriesIterator -} - -type Sample = tsdbutil.Sample - -func newSeries(l map[string]string, s []Sample) Series { - return &mockSeries{ - labels: func() labels.Labels { return labels.FromMap(l) }, - iterator: func() SeriesIterator { return newListSeriesIterator(s) }, - } -} -func (m *mockSeries) Labels() labels.Labels { return m.labels() } -func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } - -type listSeriesIterator struct { - list []Sample - idx int -} - -func newListSeriesIterator(list []Sample) *listSeriesIterator { - return &listSeriesIterator{list: list, idx: -1} -} - -func (it *listSeriesIterator) At() (int64, float64) { - s := it.list[it.idx] - return s.T(), s.V() -} - -func (it *listSeriesIterator) Next() bool { - it.idx++ - return it.idx < len(it.list) -} - -func (it *listSeriesIterator) Seek(t int64) bool { - if it.idx == -1 { - it.idx = 0 - } - // Do binary search between current position and end. - it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { - s := it.list[i+it.idx] - return s.T() >= t - }) - - return it.idx < len(it.list) -} - -func (it *listSeriesIterator) Err() error { - return nil -} - func TestMergedSeriesSet(t *testing.T) { cases := []struct { @@ -134,32 +82,32 @@ func TestMergedSeriesSet(t *testing.T) { a: newMockSeriesSet([]Series{ newSeries(map[string]string{ "a": "a", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 1, v: 1}, }), }), b: newMockSeriesSet([]Series{ newSeries(map[string]string{ "a": "a", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 2, v: 2}, }), newSeries(map[string]string{ "b": "b", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 1, v: 1}, }), }), exp: newMockSeriesSet([]Series{ newSeries(map[string]string{ "a": "a", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 1, v: 1}, sample{t: 2, v: 2}, }), newSeries(map[string]string{ "b": "b", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 1, v: 1}, }), }), @@ -169,13 +117,13 @@ func TestMergedSeriesSet(t *testing.T) { newSeries(map[string]string{ "handler": "prometheus", "instance": "127.0.0.1:9090", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 1, v: 1}, }), newSeries(map[string]string{ "handler": "prometheus", "instance": "localhost:9090", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 1, v: 2}, }), }), @@ -183,13 +131,13 @@ func TestMergedSeriesSet(t *testing.T) { newSeries(map[string]string{ "handler": "prometheus", "instance": "127.0.0.1:9090", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 2, v: 1}, }), newSeries(map[string]string{ "handler": "query", "instance": "localhost:9090", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 2, v: 2}, }), }), @@ -197,20 +145,20 @@ func TestMergedSeriesSet(t *testing.T) { newSeries(map[string]string{ "handler": "prometheus", "instance": "127.0.0.1:9090", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 1, v: 1}, sample{t: 2, v: 1}, }), newSeries(map[string]string{ "handler": "prometheus", "instance": "localhost:9090", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 1, v: 2}, }), newSeries(map[string]string{ "handler": "query", "instance": "localhost:9090", - }, []Sample{ + }, []tsdbutil.Sample{ sample{t: 2, v: 2}, }), }), @@ -316,7 +264,7 @@ func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) { } func TestBlockQuerier(t *testing.T) { - newSeries := func(l map[string]string, s []Sample) Series { + newSeries := func(l map[string]string, s []tsdbutil.Sample) Series { return &mockSeries{ labels: func() labels.Labels { return labels.FromMap(l) }, iterator: func() SeriesIterator { return newListSeriesIterator(s) }, @@ -404,13 +352,13 @@ func TestBlockQuerier(t *testing.T) { newSeries(map[string]string{ "a": "a", }, - []Sample{sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}}, + []tsdbutil.Sample{sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}}, ), newSeries(map[string]string{ "a": "a", "b": "b", }, - []Sample{sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}}, + []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}}, ), }), }, @@ -456,7 +404,7 @@ Outer: } func TestBlockQuerierDelete(t *testing.T) { - newSeries := func(l map[string]string, s []Sample) Series { + newSeries := func(l map[string]string, s []tsdbutil.Sample) Series { return &mockSeries{ labels: func() labels.Labels { return labels.FromMap(l) }, iterator: func() SeriesIterator { return newListSeriesIterator(s) }, @@ -531,13 +479,13 @@ func TestBlockQuerierDelete(t *testing.T) { newSeries(map[string]string{ "a": "a", }, - []Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}}, + []tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}}, ), newSeries(map[string]string{ "a": "a", "b": "b", }, - []Sample{sample{4, 15}, sample{5, 3}}, + []tsdbutil.Sample{sample{4, 15}, sample{5, 3}}, ), }), }, @@ -550,12 +498,12 @@ func TestBlockQuerierDelete(t *testing.T) { "a": "a", "b": "b", }, - []Sample{sample{4, 15}, sample{5, 3}}, + []tsdbutil.Sample{sample{4, 15}, sample{5, 3}}, ), newSeries(map[string]string{ "b": "b", }, - []Sample{sample{2, 2}, sample{3, 6}, sample{5, 1}}, + []tsdbutil.Sample{sample{2, 2}, sample{3, 6}, sample{5, 1}}, ), }), }, @@ -568,7 +516,7 @@ func TestBlockQuerierDelete(t *testing.T) { "a": "a", "b": "b", }, - []Sample{sample{4, 15}}, + []tsdbutil.Sample{sample{4, 15}}, ), }), }, @@ -727,66 +675,66 @@ func (s itSeries) Labels() labels.Labels { return labels.Labels{} } func TestSeriesIterator(t *testing.T) { itcases := []struct { - a, b, c []Sample - exp []Sample + a, b, c []tsdbutil.Sample + exp []tsdbutil.Sample mint, maxt int64 }{ { - a: []Sample{}, - b: []Sample{}, - c: []Sample{}, + a: []tsdbutil.Sample{}, + b: []tsdbutil.Sample{}, + c: []tsdbutil.Sample{}, - exp: []Sample{}, + exp: []tsdbutil.Sample{}, mint: math.MinInt64, maxt: math.MaxInt64, }, { - a: []Sample{ + a: []tsdbutil.Sample{ sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, }, - b: []Sample{}, - c: []Sample{ + b: []tsdbutil.Sample{}, + c: []tsdbutil.Sample{ sample{7, 89}, sample{9, 8}, }, - exp: []Sample{ + exp: []tsdbutil.Sample{ sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, }, mint: math.MinInt64, maxt: math.MaxInt64, }, { - a: []Sample{}, - b: []Sample{ + a: []tsdbutil.Sample{}, + b: []tsdbutil.Sample{ sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, }, - c: []Sample{ + c: []tsdbutil.Sample{ sample{7, 89}, sample{9, 8}, }, - exp: []Sample{ + exp: []tsdbutil.Sample{ sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, }, mint: 2, maxt: 8, }, { - a: []Sample{ + a: []tsdbutil.Sample{ sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, }, - b: []Sample{ + b: []tsdbutil.Sample{ sample{7, 89}, sample{9, 8}, }, - c: []Sample{ + c: []tsdbutil.Sample{ sample{10, 22}, sample{203, 3493}, }, - exp: []Sample{ + exp: []tsdbutil.Sample{ sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 22}, sample{203, 3493}, }, mint: 6, @@ -795,29 +743,29 @@ func TestSeriesIterator(t *testing.T) { } seekcases := []struct { - a, b, c []Sample + a, b, c []tsdbutil.Sample seek int64 success bool - exp []Sample + exp []tsdbutil.Sample mint, maxt int64 }{ { - a: []Sample{}, - b: []Sample{}, - c: []Sample{}, + a: []tsdbutil.Sample{}, + b: []tsdbutil.Sample{}, + c: []tsdbutil.Sample{}, seek: 0, success: false, exp: nil, }, { - a: []Sample{ + a: []tsdbutil.Sample{ sample{2, 3}, }, - b: []Sample{}, - c: []Sample{ + b: []tsdbutil.Sample{}, + c: []tsdbutil.Sample{ sample{7, 89}, sample{9, 8}, }, @@ -828,55 +776,55 @@ func TestSeriesIterator(t *testing.T) { maxt: math.MaxInt64, }, { - a: []Sample{}, - b: []Sample{ + a: []tsdbutil.Sample{}, + b: []tsdbutil.Sample{ sample{1, 2}, sample{3, 5}, sample{6, 1}, }, - c: []Sample{ + c: []tsdbutil.Sample{ sample{7, 89}, sample{9, 8}, }, seek: 2, success: true, - exp: []Sample{ + exp: []tsdbutil.Sample{ sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, }, mint: 5, maxt: 8, }, { - a: []Sample{ + a: []tsdbutil.Sample{ sample{6, 1}, }, - b: []Sample{ + b: []tsdbutil.Sample{ sample{9, 8}, }, - c: []Sample{ + c: []tsdbutil.Sample{ sample{10, 22}, sample{203, 3493}, }, seek: 10, success: true, - exp: []Sample{ + exp: []tsdbutil.Sample{ sample{10, 22}, sample{203, 3493}, }, mint: 10, maxt: 203, }, { - a: []Sample{ + a: []tsdbutil.Sample{ sample{6, 1}, }, - b: []Sample{ + b: []tsdbutil.Sample{ sample{9, 8}, }, - c: []Sample{ + c: []tsdbutil.Sample{ sample{10, 22}, sample{203, 3493}, }, seek: 203, success: true, - exp: []Sample{ + exp: []tsdbutil.Sample{ sample{203, 3493}, }, mint: 7, @@ -893,10 +841,10 @@ func TestSeriesIterator(t *testing.T) { } res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) - smplValid := make([]Sample, 0) + smplValid := make([]tsdbutil.Sample, 0) for _, s := range tc.exp { if s.T() >= tc.mint && s.T() <= tc.maxt { - smplValid = append(smplValid, Sample(s)) + smplValid = append(smplValid, tsdbutil.Sample(s)) } } exp := newListSeriesIterator(smplValid) @@ -910,22 +858,22 @@ func TestSeriesIterator(t *testing.T) { t.Run("Seek", func(t *testing.T) { extra := []struct { - a, b, c []Sample + a, b, c []tsdbutil.Sample seek int64 success bool - exp []Sample + exp []tsdbutil.Sample mint, maxt int64 }{ { - a: []Sample{ + a: []tsdbutil.Sample{ sample{6, 1}, }, - b: []Sample{ + b: []tsdbutil.Sample{ sample{9, 8}, }, - c: []Sample{ + c: []tsdbutil.Sample{ sample{10, 22}, sample{203, 3493}, }, @@ -936,19 +884,19 @@ func TestSeriesIterator(t *testing.T) { maxt: 202, }, { - a: []Sample{ + a: []tsdbutil.Sample{ sample{6, 1}, }, - b: []Sample{ + b: []tsdbutil.Sample{ sample{9, 8}, }, - c: []Sample{ + c: []tsdbutil.Sample{ sample{10, 22}, sample{203, 3493}, }, seek: 5, success: true, - exp: []Sample{sample{10, 22}}, + exp: []tsdbutil.Sample{sample{10, 22}}, mint: 10, maxt: 202, }, @@ -964,10 +912,10 @@ func TestSeriesIterator(t *testing.T) { } res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) - smplValid := make([]Sample, 0) + smplValid := make([]tsdbutil.Sample, 0) for _, s := range tc.exp { if s.T() >= tc.mint && s.T() <= tc.maxt { - smplValid = append(smplValid, Sample(s)) + smplValid = append(smplValid, tsdbutil.Sample(s)) } } exp := newListSeriesIterator(smplValid) @@ -1000,7 +948,7 @@ func TestSeriesIterator(t *testing.T) { itSeries{newListSeriesIterator(tc.c)} res := newChainedSeriesIterator(a, b, c) - exp := newListSeriesIterator([]Sample(tc.exp)) + exp := newListSeriesIterator([]tsdbutil.Sample(tc.exp)) smplExp, errExp := expandSeriesIterator(exp) smplRes, errRes := expandSeriesIterator(res) @@ -1045,9 +993,9 @@ func TestSeriesIterator(t *testing.T) { // Regression for: https://github.com/prometheus/tsdb/pull/97 func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { chkMetas := []chunks.Meta{ - tsdbutil.ChunkFromSamples([]Sample{}), - tsdbutil.ChunkFromSamples([]Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), - tsdbutil.ChunkFromSamples([]Sample{sample{4, 4}, sample{5, 5}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{4, 4}, sample{5, 5}}), } res := newChunkSeriesIterator(chkMetas, nil, 2, 8) @@ -1062,9 +1010,9 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { // skipped to the end when seeking a value in the current chunk. func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { metas := []chunks.Meta{ - tsdbutil.ChunkFromSamples([]Sample{}), - tsdbutil.ChunkFromSamples([]Sample{sample{1, 2}, sample{3, 4}, sample{5, 6}, sample{7, 8}}), - tsdbutil.ChunkFromSamples([]Sample{}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}, sample{3, 4}, sample{5, 6}, sample{7, 8}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{}), } it := newChunkSeriesIterator(metas, nil, 1, 7) @@ -1084,7 +1032,7 @@ func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { // Seek gets called and advances beyond the max time, which was just accepted as a valid sample. func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) { metas := []chunks.Meta{ - tsdbutil.ChunkFromSamples([]Sample{sample{1, 6}, sample{5, 6}, sample{7, 8}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 6}, sample{5, 6}, sample{7, 8}}), } it := newChunkSeriesIterator(metas, nil, 2, 4) @@ -1232,7 +1180,7 @@ func BenchmarkPersistedQueries(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block, err := OpenBlock(nil, createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) + block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, 1, int64(nSamples))), nil) testutil.Ok(b, err) defer block.Close() @@ -1462,3 +1410,53 @@ func (m mockIndex) LabelNames() ([]string, error) { sort.Strings(labelNames) return labelNames, nil } + +type mockSeries struct { + labels func() labels.Labels + iterator func() SeriesIterator +} + +func newSeries(l map[string]string, s []tsdbutil.Sample) Series { + return &mockSeries{ + labels: func() labels.Labels { return labels.FromMap(l) }, + iterator: func() SeriesIterator { return newListSeriesIterator(s) }, + } +} +func (m *mockSeries) Labels() labels.Labels { return m.labels() } +func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } + +type listSeriesIterator struct { + list []tsdbutil.Sample + idx int +} + +func newListSeriesIterator(list []tsdbutil.Sample) *listSeriesIterator { + return &listSeriesIterator{list: list, idx: -1} +} + +func (it *listSeriesIterator) At() (int64, float64) { + s := it.list[it.idx] + return s.T(), s.V() +} + +func (it *listSeriesIterator) Next() bool { + it.idx++ + return it.idx < len(it.list) +} + +func (it *listSeriesIterator) Seek(t int64) bool { + if it.idx == -1 { + it.idx = 0 + } + // Do binary search between current position and end. + it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { + s := it.list[i+it.idx] + return s.T() >= t + }) + + return it.idx < len(it.list) +} + +func (it *listSeriesIterator) Err() error { + return nil +}