From 7ce09b4e3986b4ae07a3f48c7c337448e86fdff0 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 21 Sep 2022 11:56:44 +0100 Subject: [PATCH 1/5] storage: fix BenchmarkMergeSeriesSet The SeriesSets to be merged must be created each time round the loop, otherwise the benchmark is not doing any real work. Don't call ExpandSeries, because it spends most of its time allocating a memory buffer to hold the result, which we don't look at. Signed-off-by: Bryan Boreham Fix up merge test again --- storage/merge_test.go | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/storage/merge_test.go b/storage/merge_test.go index 726296dd5..a6576da13 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -868,9 +868,7 @@ func TestChainSampleIteratorSeek(t *testing.T) { } } -var result []tsdbutil.Sample - -func makeSeriesSet(numSeries, numSamples int) SeriesSet { +func makeSeries(numSeries, numSamples int) []Series { series := []Series{} for j := 0; j < numSeries; j++ { labels := labels.FromStrings("foo", fmt.Sprintf("bar%d", j)) @@ -880,30 +878,38 @@ func makeSeriesSet(numSeries, numSamples int) SeriesSet { } series = append(series, NewListSeries(labels, samples)) } - return NewMockSeriesSet(series...) + return series } -func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet { - seriesSets := []genericSeriesSet{} - for i := 0; i < numSeriesSets; i++ { - seriesSets = append(seriesSets, &genericSeriesSetAdapter{makeSeriesSet(numSeries, numSamples)}) +func makeMergeSeriesSet(serieses [][]Series) SeriesSet { + seriesSets := make([]genericSeriesSet, len(serieses)) + for i, s := range serieses { + seriesSets[i] = &genericSeriesSetAdapter{NewMockSeriesSet(s...)} } return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)} } -func benchmarkDrain(seriesSet SeriesSet, b *testing.B) { +func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) { var err error + var t int64 + var v float64 for n := 0; n < b.N; n++ { + seriesSet := makeSeriesSet() for seriesSet.Next() { - result, err = ExpandSamples(seriesSet.At().Iterator(), nil) - require.NoError(b, err) + iter := seriesSet.At().Iterator() + for iter.Next() == chunkenc.ValFloat { + t, v = iter.At() + } + err = iter.Err() } + require.NoError(b, err) + require.NotEqual(b, t, v) // To ensure the inner loop doesn't get optimised away. } } func BenchmarkNoMergeSeriesSet_100_100(b *testing.B) { - seriesSet := makeSeriesSet(100, 100) - benchmarkDrain(seriesSet, b) + series := makeSeries(100, 100) + benchmarkDrain(b, func() SeriesSet { return NewMockSeriesSet(series...) }) } func BenchmarkMergeSeriesSet(b *testing.B) { @@ -914,9 +920,12 @@ func BenchmarkMergeSeriesSet(b *testing.B) { {10, 100, 100}, {100, 100, 100}, } { - seriesSet := makeMergeSeriesSet(bm.numSeriesSets, bm.numSeries, bm.numSamples) + serieses := [][]Series{} + for i := 0; i < bm.numSeriesSets; i++ { + serieses = append(serieses, makeSeries(bm.numSeries, bm.numSamples)) + } b.Run(fmt.Sprintf("%d_%d_%d", bm.numSeriesSets, bm.numSeries, bm.numSamples), func(b *testing.B) { - benchmarkDrain(seriesSet, b) + benchmarkDrain(b, func() SeriesSet { return makeMergeSeriesSet(serieses) }) }) } } From 3c7de690598159b535ab1a4adf73aca44969a378 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 20 Sep 2022 18:16:45 +0100 Subject: [PATCH 2/5] storage: allow re-use of iterators Patterned after `Chunk.Iterator()`: pass the old iterator in so it can be re-used to avoid allocating a new object. (This commit does not do any re-use; it is just changing all the method signatures so re-use is possible in later commits.) Signed-off-by: Bryan Boreham --- cmd/promtool/backfill_test.go | 2 +- cmd/promtool/rules_test.go | 2 +- cmd/promtool/tsdb.go | 2 +- promql/engine.go | 16 ++++++++--- promql/test_test.go | 2 +- promql/value.go | 2 +- rules/manager.go | 2 +- rules/manager_test.go | 3 ++- scrape/scrape_test.go | 4 +-- storage/fanout_test.go | 6 +++-- storage/interface.go | 13 +++++---- storage/merge.go | 14 +++++----- storage/merge_test.go | 19 ++++++------- storage/remote/codec.go | 9 ++++--- storage/remote/codec_test.go | 2 +- storage/series.go | 26 +++++++++--------- tsdb/block_test.go | 8 +++--- tsdb/compact.go | 7 ++--- tsdb/db_test.go | 50 ++++++++++++++++++++++------------- tsdb/example_test.go | 2 +- tsdb/head_test.go | 24 ++++++++--------- tsdb/querier.go | 4 +-- tsdb/querier_test.go | 15 ++++++----- tsdb/tsdbblockutil.go | 3 ++- web/federate.go | 4 ++- 25 files changed, 140 insertions(+), 101 deletions(-) diff --git a/cmd/promtool/backfill_test.go b/cmd/promtool/backfill_test.go index 398f96766..2c551abeb 100644 --- a/cmd/promtool/backfill_test.go +++ b/cmd/promtool/backfill_test.go @@ -49,7 +49,7 @@ func queryAllSeries(t testing.TB, q storage.Querier, expectedMinTime, expectedMa samples := []backfillSample{} for ss.Next() { series := ss.At() - it := series.Iterator() + it := series.Iterator(nil) require.NoError(t, it.Err()) for it.Next() == chunkenc.ValFloat { ts, v := it.At() diff --git a/cmd/promtool/rules_test.go b/cmd/promtool/rules_test.go index 0e60a20fb..caa930616 100644 --- a/cmd/promtool/rules_test.go +++ b/cmd/promtool/rules_test.go @@ -139,7 +139,7 @@ func TestBackfillRuleIntegration(t *testing.T) { } else { require.Equal(t, 3, len(series.Labels())) } - it := series.Iterator() + it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { samplesCount++ ts, v := it.At() diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 6934fad49..91b97f5c5 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -644,7 +644,7 @@ func dumpSamples(path string, mint, maxt int64) (err error) { for ss.Next() { series := ss.At() lbs := series.Labels() - it := series.Iterator() + it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { ts, val := it.At() fmt.Printf("%s %g %d\n", lbs, val, ts) diff --git a/promql/engine.go b/promql/engine.go index b3ad14b3d..0225f78d2 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1393,10 +1393,12 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { enh := &EvalNodeHelper{Out: make(Vector, 0, 1)} // Process all the calls for one time series at a time. it := storage.NewBuffer(selRange) + var chkIter chunkenc.Iterator for i, s := range selVS.Series { ev.currentSamples -= len(points) points = points[:0] - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) metric := selVS.Series[i].Labels() // The last_over_time function acts like offset; thus, it // should keep the metric name. For all the other range @@ -1578,8 +1580,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { } mat := make(Matrix, 0, len(e.Series)) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + var chkIter chunkenc.Iterator for i, s := range e.Series { - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) ss := Series{ Metric: e.Series[i].Labels(), Points: getPointSlice(numSteps), @@ -1723,8 +1727,10 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect } vec := make(Vector, 0, len(node.Series)) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + var chkIter chunkenc.Iterator for i, s := range node.Series { - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) t, v, h, ok := ev.vectorSelectorSingle(it, node, ts) if ok { @@ -1812,12 +1818,14 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } + var chkIter chunkenc.Iterator series := vs.Series for i, s := range series { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) ss := Series{ Metric: series[i].Labels(), } diff --git a/promql/test_test.go b/promql/test_test.go index 5c16e57a2..c5cb41ed9 100644 --- a/promql/test_test.go +++ b/promql/test_test.go @@ -143,7 +143,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { got := Series{ Metric: storageSeries.Labels(), } - it := storageSeries.Iterator() + it := storageSeries.Iterator(nil) for it.Next() == chunkenc.ValFloat { t, v := it.At() got.Points = append(got.Points, Point{T: t, V: v}) diff --git a/promql/value.go b/promql/value.go index 507a5e6f1..78342859e 100644 --- a/promql/value.go +++ b/promql/value.go @@ -363,7 +363,7 @@ func (ss *StorageSeries) Labels() labels.Labels { } // Iterator returns a new iterator of the data of the series. -func (ss *StorageSeries) Iterator() chunkenc.Iterator { +func (ss *StorageSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return newStorageSeriesIterator(ss.series) } diff --git a/rules/manager.go b/rules/manager.go index 42f1b59ce..4b9c8150a 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -807,7 +807,7 @@ func (g *Group) RestoreForState(ts time.Time) { // Series found for the 'for' state. var t int64 var v float64 - it := s.Iterator() + it := s.Iterator(nil) for it.Next() == chunkenc.ValFloat { t, v = it.At() } diff --git a/rules/manager_test.go b/rules/manager_test.go index 984bb81b9..5c580caf7 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -592,12 +592,13 @@ func TestStaleness(t *testing.T) { // Convert a SeriesSet into a form usable with require.Equal. func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) { result := map[string][]promql.Point{} + var it chunkenc.Iterator for ss.Next() { series := ss.At() points := []promql.Point{} - it := series.Iterator() + it := series.Iterator(it) for it.Next() == chunkenc.ValFloat { t, v := it.At() points = append(points, promql.Point{T: t, V: v}) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index b22f7f095..bb851bd9e 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -2959,7 +2959,7 @@ func TestScrapeReportSingleAppender(t *testing.T) { c := 0 for series.Next() { - i := series.At().Iterator() + i := series.At().Iterator(nil) for i.Next() != chunkenc.ValNone { c++ } @@ -3032,7 +3032,7 @@ func TestScrapeReportLimit(t *testing.T) { var found bool for series.Next() { - i := series.At().Iterator() + i := series.At().Iterator(nil) for i.Next() == chunkenc.ValFloat { _, v := i.At() require.Equal(t, 1.0, v) diff --git a/storage/fanout_test.go b/storage/fanout_test.go index ee6623397..4996e8f64 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -86,11 +86,12 @@ func TestFanout_SelectSorted(t *testing.T) { result := make(map[int64]float64) var labelsResult labels.Labels + var iterator chunkenc.Iterator for seriesSet.Next() { series := seriesSet.At() seriesLabels := series.Labels() labelsResult = seriesLabels - iterator := series.Iterator() + iterator := series.Iterator(iterator) for iterator.Next() == chunkenc.ValFloat { timestamp, value := iterator.At() result[timestamp] = value @@ -112,11 +113,12 @@ func TestFanout_SelectSorted(t *testing.T) { result := make(map[int64]float64) var labelsResult labels.Labels + var iterator chunkenc.Iterator for seriesSet.Next() { series := seriesSet.At() seriesLabels := series.Labels() labelsResult = seriesLabels - iterator := series.Iterator() + iterator := series.Iterator(iterator) for iterator.Next() == chunkenc.ValFloat { timestamp, value := iterator.At() result[timestamp] = value diff --git a/storage/interface.go b/storage/interface.go index 22d3b4186..5f0be9db9 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -382,7 +382,7 @@ func (s mockSeries) Labels() labels.Labels { return labels.FromStrings(s.labelSet...) } -func (s mockSeries) Iterator() chunkenc.Iterator { +func (s mockSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator { return chunkenc.MockSeriesIterator(s.timestamps, s.values) } @@ -421,14 +421,17 @@ type Labels interface { } type SampleIterable interface { - // Iterator returns a new, independent iterator of the data of the series. - Iterator() chunkenc.Iterator + // Iterator returns an iterator of the data of the series. + // The iterator passed as argument is for re-use. + // Depending on implementation, the iterator can + // be re-used or a new iterator can be allocated. + Iterator(chunkenc.Iterator) chunkenc.Iterator } type ChunkIterable interface { - // Iterator returns a new, independent iterator that iterates over potentially overlapping + // Iterator returns an iterator that iterates over potentially overlapping // chunks of the series, sorted by min time. - Iterator() chunks.Iterator + Iterator(chunks.Iterator) chunks.Iterator } type Warnings []error diff --git a/storage/merge.go b/storage/merge.go index 258e4e312..336d82c6f 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -425,10 +425,10 @@ func ChainedSeriesMerge(series ...Series) Series { } return &SeriesEntry{ Lset: series[0].Labels(), - SampleIteratorFn: func() chunkenc.Iterator { + SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator { iterators := make([]chunkenc.Iterator, 0, len(series)) for _, s := range series { - iterators = append(iterators, s.Iterator()) + iterators = append(iterators, s.Iterator(nil)) } return NewChainSampleIterator(iterators) }, @@ -607,10 +607,10 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC } return &ChunkSeriesEntry{ Lset: series[0].Labels(), - ChunkIteratorFn: func() chunks.Iterator { + ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { iterators := make([]chunks.Iterator, 0, len(series)) for _, s := range series { - iterators = append(iterators, s.Iterator()) + iterators = append(iterators, s.Iterator(nil)) } return &compactChunkIterator{ mergeFunc: mergeFunc, @@ -693,7 +693,7 @@ func (c *compactChunkIterator) Next() bool { } // Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here. - iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator() + iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator(nil) if !iter.Next() { if c.err = iter.Err(); c.err != nil { return false @@ -751,10 +751,10 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc { } return &ChunkSeriesEntry{ Lset: series[0].Labels(), - ChunkIteratorFn: func() chunks.Iterator { + ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { iterators := make([]chunks.Iterator, 0, len(series)) for _, s := range series { - iterators = append(iterators, s.Iterator()) + iterators = append(iterators, s.Iterator(nil)) } return &concatenatingChunkIterator{ iterators: iterators, diff --git a/storage/merge_test.go b/storage/merge_test.go index a6576da13..407fc4ea5 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -202,8 +202,8 @@ func TestMergeQuerierWithChainMerger(t *testing.T) { expectedSeries := tc.expected.At() require.Equal(t, expectedSeries.Labels(), actualSeries.Labels()) - expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(), nil) - actSmpl, actErr := ExpandSamples(actualSeries.Iterator(), nil) + expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(nil), nil) + actSmpl, actErr := ExpandSamples(actualSeries.Iterator(nil), nil) require.Equal(t, expErr, actErr) require.Equal(t, expSmpl, actSmpl) } @@ -370,8 +370,8 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { expectedSeries := tc.expected.At() require.Equal(t, expectedSeries.Labels(), actualSeries.Labels()) - expChks, expErr := ExpandChunks(expectedSeries.Iterator()) - actChks, actErr := ExpandChunks(actualSeries.Iterator()) + expChks, expErr := ExpandChunks(expectedSeries.Iterator(nil)) + actChks, actErr := ExpandChunks(actualSeries.Iterator(nil)) require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) @@ -533,8 +533,8 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) require.Equal(t, tc.expected.Labels(), merged.Labels()) - actChks, actErr := ExpandChunks(merged.Iterator()) - expChks, expErr := ExpandChunks(tc.expected.Iterator()) + actChks, actErr := ExpandChunks(merged.Iterator(nil)) + expChks, expErr := ExpandChunks(tc.expected.Iterator(nil)) require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) @@ -667,8 +667,8 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) require.Equal(t, tc.expected.Labels(), merged.Labels()) - actChks, actErr := ExpandChunks(merged.Iterator()) - expChks, expErr := ExpandChunks(tc.expected.Iterator()) + actChks, actErr := ExpandChunks(merged.Iterator(nil)) + expChks, expErr := ExpandChunks(tc.expected.Iterator(nil)) require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) @@ -893,10 +893,11 @@ func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) { var err error var t int64 var v float64 + var iter chunkenc.Iterator for n := 0; n < b.N; n++ { seriesSet := makeSeriesSet() for seriesSet.Next() { - iter := seriesSet.At().Iterator() + iter = seriesSet.At().Iterator(iter) for iter.Next() == chunkenc.ValFloat { t, v = iter.At() } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 48c2d8615..9b7516b87 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" ) // decodeReadLimit is the maximum size of a read request body in bytes. @@ -115,9 +116,10 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHi func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, storage.Warnings, error) { numSamples := 0 resp := &prompb.QueryResult{} + var iter chunkenc.Iterator for ss.Next() { series := ss.At() - iter := series.Iterator() + iter = series.Iterator(iter) samples := []prompb.Sample{} for iter.Next() == chunkenc.ValFloat { @@ -199,11 +201,12 @@ func StreamChunkedReadResponses( var ( chks []prompb.Chunk lbls []prompb.Label + iter chunks.Iterator ) for ss.Next() { series := ss.At() - iter := series.Iterator() + iter = series.Iterator(iter) lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels) frameBytesLeft := maxBytesInFrame @@ -346,7 +349,7 @@ func (c *concreteSeries) Labels() labels.Labels { return labels.New(c.labels...) } -func (c *concreteSeries) Iterator() chunkenc.Iterator { +func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return newConcreteSeriersIterator(c) } diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index c806097c6..596eb0861 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -215,7 +215,7 @@ func TestConcreteSeriesIterator(t *testing.T) { {Value: 4, Timestamp: 4}, }, } - it := series.Iterator() + it := series.Iterator(nil) // Seek to the first sample with ts=1. require.Equal(t, chunkenc.ValFloat, it.Seek(1)) diff --git a/storage/series.go b/storage/series.go index 3259dd4d0..87b1256f6 100644 --- a/storage/series.go +++ b/storage/series.go @@ -27,25 +27,25 @@ import ( type SeriesEntry struct { Lset labels.Labels - SampleIteratorFn func() chunkenc.Iterator + SampleIteratorFn func(chunkenc.Iterator) chunkenc.Iterator } -func (s *SeriesEntry) Labels() labels.Labels { return s.Lset } -func (s *SeriesEntry) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() } +func (s *SeriesEntry) Labels() labels.Labels { return s.Lset } +func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return s.SampleIteratorFn(it) } type ChunkSeriesEntry struct { Lset labels.Labels - ChunkIteratorFn func() chunks.Iterator + ChunkIteratorFn func(chunks.Iterator) chunks.Iterator } -func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset } -func (s *ChunkSeriesEntry) Iterator() chunks.Iterator { return s.ChunkIteratorFn() } +func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset } +func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) } // NewListSeries returns series entry with iterator that allows to iterate over provided samples. func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { return &SeriesEntry{ Lset: lset, - SampleIteratorFn: func() chunkenc.Iterator { + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { return NewListSeriesIterator(samples(s)) }, } @@ -56,7 +56,7 @@ func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sample) *ChunkSeriesEntry { return &ChunkSeriesEntry{ Lset: lset, - ChunkIteratorFn: func() chunks.Iterator { + ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator { chks := make([]chunks.Meta, 0, len(samples)) for _, s := range samples { chks = append(chks, tsdbutil.ChunkFromSamples(s)) @@ -178,7 +178,7 @@ func (c *chunkSetToSeriesSet) Next() bool { return false } - iter := c.ChunkSeriesSet.At().Iterator() + iter := c.ChunkSeriesSet.At().Iterator(nil) c.sameSeriesChunks = c.sameSeriesChunks[:0] for iter.Next() { @@ -210,9 +210,9 @@ func (c *chunkSetToSeriesSet) Err() error { func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series { return &SeriesEntry{ Lset: labels, - SampleIteratorFn: func() chunkenc.Iterator { + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { // TODO(bwplotka): Can we provide any chunkenc buffer? - return chk.Chunk.Iterator(nil) + return chk.Chunk.Iterator(it) }, } } @@ -252,7 +252,7 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries { return &seriesToChunkEncoder{series} } -func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { +func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { var ( chk chunkenc.Chunk app chunkenc.Appender @@ -263,7 +263,7 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { chks := []chunks.Meta{} i := 0 - seriesIter := s.Series.Iterator() + seriesIter := s.Series.Iterator(nil) lastType := chunkenc.ValNone for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { if typ != lastType || i >= seriesToChunkEncoderSplit { diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 6cb00b348..c3a6ff576 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -203,7 +203,7 @@ func TestCorruptedChunk(t *testing.T) { // Check chunk errors during iter time. require.True(t, set.Next()) - it := set.At().Iterator() + it := set.At().Iterator(nil) require.Equal(t, chunkenc.ValNone, it.Next()) require.Equal(t, tc.iterErr.Error(), it.Err().Error()) }) @@ -505,11 +505,12 @@ func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir str head, err := NewHead(nil, nil, w, nil, opts, nil) require.NoError(tb, err) + var it chunkenc.Iterator ctx := context.Background() app := head.Appender(ctx) for _, s := range series { ref := storage.SeriesRef(0) - it := s.Iterator() + it = s.Iterator(it) lset := s.Labels() typ := it.Next() lastTyp := typ @@ -550,11 +551,12 @@ func createHeadWithOOOSamples(tb testing.TB, w *wlog.WL, series []storage.Series oooSampleLabels := make([]labels.Labels, 0, len(series)) oooSamples := make([]tsdbutil.SampleSlice, 0, len(series)) + var it chunkenc.Iterator totalSamples := 0 app := head.Appender(context.Background()) for _, s := range series { ref := storage.SeriesRef(0) - it := s.Iterator() + it = s.Iterator(it) lset := s.Labels() os := tsdbutil.SampleSlice{} count := 0 diff --git a/tsdb/compact.go b/tsdb/compact.go index 9fe50fda1..f216ad46a 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -746,8 +746,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } var ( - ref = storage.SeriesRef(0) - chks []chunks.Meta + ref = storage.SeriesRef(0) + chks []chunks.Meta + chksIter chunks.Iterator ) set := sets[0] @@ -765,7 +766,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, default: } s := set.At() - chksIter := s.Iterator() + chksIter = s.Iterator(chksIter) chks = chks[:0] for chksIter.Next() { // We are not iterating in streaming way over chunk as diff --git a/tsdb/db_test.go b/tsdb/db_test.go index d4c2840c2..cea4b6e36 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -93,12 +93,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str require.NoError(t, q.Close()) }() + var it chunkenc.Iterator result := map[string][]tsdbutil.Sample{} for ss.Next() { series := ss.At() samples := []tsdbutil.Sample{} - it := series.Iterator() + it = series.Iterator(it) for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { switch typ { case chunkenc.ValFloat: @@ -133,12 +134,13 @@ func queryChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Match require.NoError(t, q.Close()) }() + var it chunks.Iterator result := map[string][]chunks.Meta{} for ss.Next() { series := ss.At() chks := []chunks.Meta{} - it := series.Iterator() + it = series.Iterator(it) for it.Next() { chks = append(chks, it.At()) } @@ -454,8 +456,8 @@ Outer: require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -628,9 +630,10 @@ func TestDB_Snapshot(t *testing.T) { // sum values seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { _, v := series.At() sum += v @@ -676,9 +679,10 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { // Sum values. seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { _, v := series.At() sum += v @@ -770,8 +774,8 @@ Outer: require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -921,7 +925,7 @@ func TestDB_e2e(t *testing.T) { for ss.Next() { x := ss.At() - smpls, err := storage.ExpandSamples(x.Iterator(), newSample) + smpls, err := storage.ExpandSamples(x.Iterator(nil), newSample) require.NoError(t, err) if len(smpls) > 0 { @@ -1108,12 +1112,13 @@ func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBefore set := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+")) actualSeries := 0 + var chunksIt chunks.Iterator for set.Next() { actualSeries++ actualChunks := 0 - chunksIt := set.At().Iterator() + chunksIt = set.At().Iterator(chunksIt) for chunksIt.Next() { actualChunks++ } @@ -1205,8 +1210,8 @@ func TestTombstoneClean(t *testing.T) { require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -1479,11 +1484,12 @@ func TestSizeRetention(t *testing.T) { // Add some data to the WAL. headApp := db.Head().Appender(context.Background()) var aSeries labels.Labels + var it chunkenc.Iterator for _, m := range headBlocks { series := genSeries(100, 10, m.MinTime, m.MaxTime+1) for _, s := range series { aSeries = s.Labels() - it := s.Iterator() + it = s.Iterator(it) for it.Next() == chunkenc.ValFloat { tim, v := it.At() _, err := headApp.Append(0, s.Labels(), tim, v) @@ -1691,10 +1697,11 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample, storage.Warnings, error) { resultLabels := []labels.Labels{} resultSamples := map[string][]sample{} + var it chunkenc.Iterator for ss.Next() { series := ss.At() samples := []sample{} - it := series.Iterator() + it = series.Iterator(it) for it.Next() == chunkenc.ValFloat { t, v := it.At() samples = append(samples, sample{t: t, v: v}) @@ -2500,10 +2507,11 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { // Sum the values. seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) + var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { _, v := series.At() sum += v @@ -2946,10 +2954,11 @@ func TestCompactHead(t *testing.T) { defer func() { require.NoError(t, querier.Close()) }() seriesSet := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"}) + var series chunkenc.Iterator var actSamples []sample for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { time, val := series.At() actSamples = append(actSamples, sample{int64(time), val, nil, nil}) @@ -3347,7 +3356,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t actualSeries++ // Get the iterator and call Next() so that we're sure the chunk is loaded. - it := seriesSet.At().Iterator() + it := seriesSet.At().Iterator(nil) it.Next() it.At() @@ -3477,11 +3486,13 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+")) // Iterate all series and get their chunks. + var it chunks.Iterator var chunks []chunkenc.Chunk actualSeries := 0 for seriesSet.Next() { actualSeries++ - for it := seriesSet.At().Iterator(); it.Next(); { + it = seriesSet.At().Iterator(it) + for it.Next() { chunks = append(chunks, it.At().Chunk) } } @@ -6025,13 +6036,14 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { ctx := context.Background() + var it chunkenc.Iterator exp := make(map[string][]tsdbutil.Sample) for _, series := range blockSeries { createBlock(t, db.Dir(), series) for _, s := range series { key := s.Labels().String() - it := s.Iterator() + it = s.Iterator(it) slice := exp[key] for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { switch typ { diff --git a/tsdb/example_test.go b/tsdb/example_test.go index c33bf6dc0..da0e37923 100644 --- a/tsdb/example_test.go +++ b/tsdb/example_test.go @@ -67,7 +67,7 @@ func Example() { series := ss.At() fmt.Println("series:", series.Labels().String()) - it := series.Iterator() + it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { _, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below) fmt.Println("sample", v) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 9b8eb0278..59824ae08 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -924,8 +924,8 @@ func TestHeadDeleteSimple(t *testing.T) { require.Equal(t, expSeries.Labels(), actSeries.Labels()) - smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -959,7 +959,7 @@ func TestDeleteUntilCurMax(t *testing.T) { res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, res.Next(), "series is not present") s := res.At() - it := s.Iterator() + it := s.Iterator(nil) require.Equal(t, chunkenc.ValNone, it.Next(), "expected no samples") for res.Next() { } @@ -976,7 +976,7 @@ func TestDeleteUntilCurMax(t *testing.T) { res = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, res.Next(), "series don't exist") exps := res.At() - it = exps.Iterator() + it = exps.Iterator(nil) resSamples, err := storage.ExpandSamples(it, newSample) require.NoError(t, err) require.Equal(t, []tsdbutil.Sample{sample{11, 1, nil, nil}}, resSamples) @@ -1163,7 +1163,7 @@ func TestDelete_e2e(t *testing.T) { eok, rok := expSs.Next(), ss.Next() // Skip a series if iterator is empty. if rok { - for ss.At().Iterator().Next() == chunkenc.ValNone { + for ss.At().Iterator(nil).Next() == chunkenc.ValNone { rok = ss.Next() if !rok { break @@ -1177,8 +1177,8 @@ func TestDelete_e2e(t *testing.T) { sexp := expSs.At() sres := ss.At() require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) } @@ -2635,7 +2635,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { <-time.After(3 * time.Second) // Now consume after compaction when it's gone. - it := s.Iterator() + it := s.Iterator(nil) for it.Next() == chunkenc.ValFloat { _, _ = it.At() } @@ -2643,7 +2643,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { require.NoError(t, it.Err()) for ss.Next() { s = ss.At() - it := s.Iterator() + it = s.Iterator(it) for it.Next() == chunkenc.ValFloat { _, _ = it.At() } @@ -2841,7 +2841,7 @@ func TestAppendHistogram(t *testing.T) { s := ss.At() require.False(t, ss.Next()) - it := s.Iterator() + it := s.Iterator(nil) actHistograms := make([]timedHistogram, 0, len(expHistograms)) for it.Next() == chunkenc.ValHistogram { t, h := it.AtHistogram() @@ -3304,7 +3304,7 @@ func TestHistogramStaleSample(t *testing.T) { s := ss.At() require.False(t, ss.Next()) - it := s.Iterator() + it := s.Iterator(nil) actHistograms := make([]timedHistogram, 0, len(expHistograms)) for it.Next() == chunkenc.ValHistogram { t, h := it.AtHistogram() @@ -3581,7 +3581,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, ss.Next()) s := ss.At() - it := s.Iterator() + it := s.Iterator(nil) expIdx := 0 loop: for { diff --git a/tsdb/querier.go b/tsdb/querier.go index cc765903c..3ae1c4f1e 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -838,7 +838,7 @@ func (b *blockSeriesSet) At() storage.Series { currIterFn := b.currIterFn return &storage.SeriesEntry{ Lset: b.currLabels, - SampleIteratorFn: func() chunkenc.Iterator { + SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator { return currIterFn().toSeriesIterator() }, } @@ -872,7 +872,7 @@ func (b *blockChunkSeriesSet) At() storage.ChunkSeries { currIterFn := b.currIterFn return &storage.ChunkSeriesEntry{ Lset: b.currLabels, - ChunkIteratorFn: func() chunks.Iterator { + ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { return currIterFn().toChunkSeriesIterator() }, } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index ffb24b17b..20e4c2f8f 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -194,8 +194,8 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C sres := res.At() require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -230,9 +230,9 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C require.Equal(t, sexpChks.Labels(), sres.Labels()) - chksExp, errExp := storage.ExpandChunks(sexpChks.Iterator()) + chksExp, errExp := storage.ExpandChunks(sexpChks.Iterator(nil)) rmChunkRefs(chksExp) - chksRes, errRes := storage.ExpandChunks(sres.Iterator()) + chksRes, errRes := storage.ExpandChunks(sres.Iterator(nil)) rmChunkRefs(chksRes) require.Equal(t, errExp, errRes) require.Equal(t, chksExp, chksRes) @@ -1433,9 +1433,10 @@ func BenchmarkQuerySeek(b *testing.B) { b.ResetTimer() b.ReportAllocs() + var it chunkenc.Iterator ss := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) for ss.Next() { - it := ss.At().Iterator() + it = ss.At().Iterator(it) for t := mint; t <= maxt; t++ { it.Seek(t) } @@ -2042,11 +2043,13 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la for i := 0; i < b.N; i++ { ss := q.Select(false, nil, selectors...) var actualExpansions int + var it chunkenc.Iterator for ss.Next() { s := ss.At() s.Labels() - it := s.Iterator() + it = s.Iterator(it) for it.Next() != chunkenc.ValNone { + _, _ = it.At() } actualExpansions++ } diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 777db5e90..8117f431c 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -49,10 +49,11 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l const commitAfter = 10000 ctx := context.Background() app := w.Appender(ctx) + var it chunkenc.Iterator for _, s := range series { ref := storage.SeriesRef(0) - it := s.Iterator() + it = s.Iterator(it) lset := s.Labels() typ := it.Next() lastTyp := typ diff --git a/web/federate.go b/web/federate.go index 5ba68fa28..baa3b5866 100644 --- a/web/federate.go +++ b/web/federate.go @@ -102,12 +102,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) it := storage.NewBuffer(int64(h.lookbackDelta / 1e6)) + var chkIter chunkenc.Iterator for set.Next() { s := set.At() // TODO(fabxc): allow fast path for most recent sample either // in the storage itself or caching layer in Prometheus. - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) var t int64 var v float64 From f0866c0774a926a57868b8e38493131993c7f977 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 20 Sep 2022 19:27:44 +0100 Subject: [PATCH 3/5] tsdb: optimise block series iterators Re-use previous memory if it is already of the correct type. Also turn two levels of function closure into a single object that holds the required data. Signed-off-by: Bryan Boreham --- tsdb/querier.go | 123 +++++++++++++++++++++++++++---------------- tsdb/querier_test.go | 27 ++++++---- 2 files changed, 94 insertions(+), 56 deletions(-) diff --git a/tsdb/querier.go b/tsdb/querier.go index 3ae1c4f1e..642e089aa 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -426,6 +426,16 @@ func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]strin return r.LabelNamesFor(postings...) } +// These are the things fetched when we move from one series to another. +type seriesData struct { + chks []chunks.Meta + intervals tombstones.Intervals + labels labels.Labels +} + +// Labels implements part of storage.Series and storage.ChunkSeries. +func (s *seriesData) Labels() labels.Labels { return s.labels } + // blockBaseSeriesSet allows to iterate over all series in the single block. // Iterated series are trimmed with given min and max time as well as tombstones. // See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating. @@ -438,8 +448,7 @@ type blockBaseSeriesSet struct { mint, maxt int64 disableTrimming bool - currIterFn func() *populateWithDelGenericSeriesIterator - currLabels labels.Labels + curr seriesData bufChks []chunks.Meta bufLbls labels.Labels @@ -519,12 +528,11 @@ func (b *blockBaseSeriesSet) Next() bool { intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64}) } - b.currLabels = make(labels.Labels, len(b.bufLbls)) - copy(b.currLabels, b.bufLbls) + b.curr.labels = make(labels.Labels, len(b.bufLbls)) + copy(b.curr.labels, b.bufLbls) + b.curr.chks = chks + b.curr.intervals = intervals - b.currIterFn = func() *populateWithDelGenericSeriesIterator { - return newPopulateWithDelGenericSeriesIterator(b.blockID, b.chunks, chks, intervals) - } return true } return false @@ -556,29 +564,26 @@ type populateWithDelGenericSeriesIterator struct { // the same, single series. chks []chunks.Meta - i int + i int // Index into chks; -1 if not started yet. err error - bufIter *DeletedIterator + bufIter DeletedIterator // Retained for memory re-use. currDelIter may point here. intervals tombstones.Intervals currDelIter chunkenc.Iterator currChkMeta chunks.Meta } -func newPopulateWithDelGenericSeriesIterator( - blockID ulid.ULID, - chunks ChunkReader, - chks []chunks.Meta, - intervals tombstones.Intervals, -) *populateWithDelGenericSeriesIterator { - return &populateWithDelGenericSeriesIterator{ - blockID: blockID, - chunks: chunks, - chks: chks, - i: -1, - bufIter: &DeletedIterator{}, - intervals: intervals, - } +func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) { + p.blockID = blockID + p.chunks = cr + p.chks = chks + p.i = -1 + p.err = nil + p.bufIter.Iter = nil + p.bufIter.Intervals = p.bufIter.Intervals[:0] + p.intervals = intervals + p.currDelIter = nil + p.currChkMeta = chunks.Meta{} } func (p *populateWithDelGenericSeriesIterator) next() bool { @@ -618,28 +623,55 @@ func (p *populateWithDelGenericSeriesIterator) next() bool { // We don't want the full chunk, or it's potentially still opened, take // just a part of it. - p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil) - p.currDelIter = p.bufIter + p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(p.bufIter.Iter) + p.currDelIter = &p.bufIter return true } func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err } -func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator { - return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p} +type blockSeriesEntry struct { + chunks ChunkReader + blockID ulid.ULID + seriesData } -func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator { - return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p} +func (s *blockSeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + pi, ok := it.(*populateWithDelSeriesIterator) + if !ok { + pi = &populateWithDelSeriesIterator{} + } + pi.reset(s.blockID, s.chunks, s.chks, s.intervals) + return pi +} + +type chunkSeriesEntry struct { + chunks ChunkReader + blockID ulid.ULID + seriesData +} + +func (s *chunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { + pi, ok := it.(*populateWithDelChunkSeriesIterator) + if !ok { + pi = &populateWithDelChunkSeriesIterator{} + } + pi.reset(s.blockID, s.chunks, s.chks, s.intervals) + return pi } // populateWithDelSeriesIterator allows to iterate over samples for the single series. type populateWithDelSeriesIterator struct { - *populateWithDelGenericSeriesIterator + populateWithDelGenericSeriesIterator curr chunkenc.Iterator } +func (p *populateWithDelSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) { + p.populateWithDelGenericSeriesIterator.reset(blockID, cr, chks, intervals) + p.curr = nil +} + func (p *populateWithDelSeriesIterator) Next() chunkenc.ValueType { if p.curr != nil { if valueType := p.curr.Next(); valueType != chunkenc.ValNone { @@ -701,11 +733,16 @@ func (p *populateWithDelSeriesIterator) Err() error { } type populateWithDelChunkSeriesIterator struct { - *populateWithDelGenericSeriesIterator + populateWithDelGenericSeriesIterator curr chunks.Meta } +func (p *populateWithDelChunkSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) { + p.populateWithDelGenericSeriesIterator.reset(blockID, cr, chks, intervals) + p.curr = chunks.Meta{} +} + func (p *populateWithDelChunkSeriesIterator) Next() bool { if !p.next() { return false @@ -834,13 +871,11 @@ func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p inde } func (b *blockSeriesSet) At() storage.Series { - // At can be looped over before iterating, so save the current value locally. - currIterFn := b.currIterFn - return &storage.SeriesEntry{ - Lset: b.currLabels, - SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator { - return currIterFn().toSeriesIterator() - }, + // At can be looped over before iterating, so save the current values locally. + return &blockSeriesEntry{ + chunks: b.chunks, + blockID: b.blockID, + seriesData: b.curr, } } @@ -868,13 +903,11 @@ func newBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombst } func (b *blockChunkSeriesSet) At() storage.ChunkSeries { - // At can be looped over before iterating, so save the current value locally. - currIterFn := b.currIterFn - return &storage.ChunkSeriesEntry{ - Lset: b.currLabels, - ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { - return currIterFn().toChunkSeriesIterator() - }, + // At can be looped over before iterating, so save the current values locally. + return &chunkSeriesEntry{ + chunks: b.chunks, + blockID: b.blockID, + seriesData: b.curr, } } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 20e4c2f8f..3b44cef51 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -859,7 +859,8 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Run("sample", func(t *testing.T) { f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...) - it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, tc.intervals).toSeriesIterator() + it := &populateWithDelSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, tc.intervals) var r []tsdbutil.Sample if tc.seek != 0 { @@ -879,7 +880,8 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }) t.Run("chunk", func(t *testing.T) { f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...) - it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, tc.intervals).toChunkSeriesIterator() + it := &populateWithDelChunkSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, tc.intervals) if tc.seek != 0 { // Chunk iterator does not have Seek method. @@ -911,7 +913,8 @@ func TestPopulateWithDelSeriesIterator_DoubleSeek(t *testing.T) { []tsdbutil.Sample{sample{4, 4, nil, nil}, sample{5, 5, nil, nil}}, ) - it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator() + it := &populateWithDelSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, nil) require.Equal(t, chunkenc.ValFloat, it.Seek(1)) require.Equal(t, chunkenc.ValFloat, it.Seek(2)) require.Equal(t, chunkenc.ValFloat, it.Seek(2)) @@ -929,7 +932,8 @@ func TestPopulateWithDelSeriesIterator_SeekInCurrentChunk(t *testing.T) { []tsdbutil.Sample{}, ) - it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator() + it := &populateWithDelSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, nil) require.Equal(t, chunkenc.ValFloat, it.Next()) ts, v := it.At() require.Equal(t, int64(1), ts) @@ -946,7 +950,8 @@ func TestPopulateWithDelSeriesIterator_SeekWithMinTime(t *testing.T) { []tsdbutil.Sample{sample{1, 6, nil, nil}, sample{5, 6, nil, nil}, sample{6, 8, nil, nil}}, ) - it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator() + it := &populateWithDelSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, nil) require.Equal(t, chunkenc.ValNone, it.Seek(7)) require.Equal(t, chunkenc.ValFloat, it.Seek(3)) } @@ -958,9 +963,8 @@ func TestPopulateWithDelSeriesIterator_NextWithMinTime(t *testing.T) { []tsdbutil.Sample{sample{1, 6, nil, nil}, sample{5, 6, nil, nil}, sample{7, 8, nil, nil}}, ) - it := newPopulateWithDelGenericSeriesIterator( - ulid.ULID{}, f, chkMetas, tombstones.Intervals{{Mint: math.MinInt64, Maxt: 2}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64}), - ).toSeriesIterator() + it := &populateWithDelSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, tombstones.Intervals{{Mint: math.MinInt64, Maxt: 2}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64})) require.Equal(t, chunkenc.ValNone, it.Next()) } @@ -2225,11 +2229,12 @@ func TestBlockBaseSeriesSet(t *testing.T) { i := 0 for bcs.Next() { - chks := bcs.currIterFn().chks + si := populateWithDelGenericSeriesIterator{} + si.reset(bcs.blockID, bcs.chunks, bcs.curr.chks, bcs.curr.intervals) idx := tc.expIdxs[i] - require.Equal(t, tc.series[idx].lset, bcs.currLabels) - require.Equal(t, tc.series[idx].chunks, chks) + require.Equal(t, tc.series[idx].lset, bcs.curr.labels) + require.Equal(t, tc.series[idx].chunks, si.chks) i++ } From 463f5cafdd243183c12ddb787d5bd8ef5fb42f5d Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 20 Sep 2022 19:31:28 +0100 Subject: [PATCH 4/5] storage: re-use iterators to save garbage Re-use previous memory if it is already of the correct type. In `NewListSeries` we hoist the conversion to an interface value out so it only allocates once. Signed-off-by: Bryan Boreham --- promql/value.go | 9 ++++++++ storage/merge.go | 51 ++++++++++++++++++++++++++++++----------- storage/merge_test.go | 4 ++-- storage/remote/codec.go | 9 ++++++++ storage/series.go | 49 +++++++++++++++++++++++++++++++++------ tsdb/head_read.go | 6 +---- 6 files changed, 100 insertions(+), 28 deletions(-) diff --git a/promql/value.go b/promql/value.go index 78342859e..4db976e97 100644 --- a/promql/value.go +++ b/promql/value.go @@ -364,6 +364,10 @@ func (ss *StorageSeries) Labels() labels.Labels { // Iterator returns a new iterator of the data of the series. func (ss *StorageSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + if ssi, ok := it.(*storageSeriesIterator); ok { + ssi.reset(ss.series) + return ssi + } return newStorageSeriesIterator(ss.series) } @@ -379,6 +383,11 @@ func newStorageSeriesIterator(series Series) *storageSeriesIterator { } } +func (ssi *storageSeriesIterator) reset(series Series) { + ssi.points = series.Points + ssi.curr = -1 +} + func (ssi *storageSeriesIterator) Seek(t int64) chunkenc.ValueType { i := ssi.curr if i < 0 { diff --git a/storage/merge.go b/storage/merge.go index 336d82c6f..78a0125db 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -425,12 +425,8 @@ func ChainedSeriesMerge(series ...Series) Series { } return &SeriesEntry{ Lset: series[0].Labels(), - SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator { - iterators := make([]chunkenc.Iterator, 0, len(series)) - for _, s := range series { - iterators = append(iterators, s.Iterator(nil)) - } - return NewChainSampleIterator(iterators) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return ChainSampleIteratorFromSeries(it, series) }, } } @@ -446,15 +442,42 @@ type chainSampleIterator struct { lastT int64 } -// NewChainSampleIterator returns a single iterator that iterates over the samples from the given iterators in a sorted -// fashion. If samples overlap, one sample from overlapped ones is kept (randomly) and all others with the same -// timestamp are dropped. -func NewChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator { - return &chainSampleIterator{ - iterators: iterators, - h: nil, - lastT: math.MinInt64, +// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible. +func getChainSampleIterator(it chunkenc.Iterator, length int) *chainSampleIterator { + csi, ok := it.(*chainSampleIterator) + if !ok { + csi = &chainSampleIterator{} } + if cap(csi.iterators) < length { + csi.iterators = make([]chunkenc.Iterator, length) + } else { + csi.iterators = csi.iterators[:length] + } + csi.h = nil + csi.lastT = math.MinInt64 + return csi +} + +func ChainSampleIteratorFromSeries(it chunkenc.Iterator, series []Series) chunkenc.Iterator { + csi := getChainSampleIterator(it, len(series)) + for i, s := range series { + csi.iterators[i] = s.Iterator(csi.iterators[i]) + } + return csi +} + +func ChainSampleIteratorFromMetas(it chunkenc.Iterator, chunks []chunks.Meta) chunkenc.Iterator { + csi := getChainSampleIterator(it, len(chunks)) + for i, c := range chunks { + csi.iterators[i] = c.Chunk.Iterator(csi.iterators[i]) + } + return csi +} + +func ChainSampleIteratorFromIterators(it chunkenc.Iterator, iterators []chunkenc.Iterator) chunkenc.Iterator { + csi := getChainSampleIterator(it, 0) + csi.iterators = iterators + return csi } func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType { diff --git a/storage/merge_test.go b/storage/merge_test.go index 407fc4ea5..ad68684c0 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -809,7 +809,7 @@ func TestChainSampleIterator(t *testing.T) { expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}}, }, } { - merged := NewChainSampleIterator(tc.input) + merged := ChainSampleIteratorFromIterators(nil, tc.input) actual, err := ExpandSamples(merged, nil) require.NoError(t, err) require.Equal(t, tc.expected, actual) @@ -855,7 +855,7 @@ func TestChainSampleIteratorSeek(t *testing.T) { expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}}, }, } { - merged := NewChainSampleIterator(tc.input) + merged := ChainSampleIteratorFromIterators(nil, tc.input) actual := []tsdbutil.Sample{} if merged.Seek(tc.seek) == chunkenc.ValFloat { t, v := merged.At() diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 9b7516b87..a74ad2b7b 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -350,6 +350,10 @@ func (c *concreteSeries) Labels() labels.Labels { } func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + if csi, ok := it.(*concreteSeriesIterator); ok { + csi.reset(c) + return csi + } return newConcreteSeriersIterator(c) } @@ -366,6 +370,11 @@ func newConcreteSeriersIterator(series *concreteSeries) chunkenc.Iterator { } } +func (c *concreteSeriesIterator) reset(series *concreteSeries) { + c.cur = -1 + c.series = series +} + // Seek implements storage.SeriesIterator. func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { if c.cur == -1 { diff --git a/storage/series.go b/storage/series.go index 87b1256f6..339beb2c9 100644 --- a/storage/series.go +++ b/storage/series.go @@ -43,10 +43,15 @@ func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return // NewListSeries returns series entry with iterator that allows to iterate over provided samples. func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { + samplesS := Samples(samples(s)) return &SeriesEntry{ Lset: lset, SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { - return NewListSeriesIterator(samples(s)) + if lsi, ok := it.(*listSeriesIterator); ok { + lsi.Reset(samplesS) + return lsi + } + return NewListSeriesIterator(samplesS) }, } } @@ -57,10 +62,20 @@ func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sam return &ChunkSeriesEntry{ Lset: lset, ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator { - chks := make([]chunks.Meta, 0, len(samples)) + lcsi, existing := it.(*listChunkSeriesIterator) + var chks []chunks.Meta + if existing { + chks = lcsi.chks[:0] + } else { + chks = make([]chunks.Meta, 0, len(samples)) + } for _, s := range samples { chks = append(chks, tsdbutil.ChunkFromSamples(s)) } + if existing { + lcsi.Reset(chks...) + return lcsi + } return NewListChunkSeriesIterator(chks...) }, } @@ -87,6 +102,11 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator { return &listSeriesIterator{samples: samples, idx: -1} } +func (it *listSeriesIterator) Reset(samples Samples) { + it.samples = samples + it.idx = -1 +} + func (it *listSeriesIterator) At() (int64, float64) { s := it.samples.Get(it.idx) return s.T(), s.V() @@ -150,6 +170,11 @@ func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator { return &listChunkSeriesIterator{chks: chks, idx: -1} } +func (it *listChunkSeriesIterator) Reset(chks ...chunks.Meta) { + it.chks = chks + it.idx = -1 +} + func (it *listChunkSeriesIterator) At() chunks.Meta { return it.chks[it.idx] } @@ -164,6 +189,7 @@ func (it *listChunkSeriesIterator) Err() error { return nil } type chunkSetToSeriesSet struct { ChunkSeriesSet + iter chunks.Iterator chkIterErr error sameSeriesChunks []Series } @@ -178,18 +204,18 @@ func (c *chunkSetToSeriesSet) Next() bool { return false } - iter := c.ChunkSeriesSet.At().Iterator(nil) + c.iter = c.ChunkSeriesSet.At().Iterator(c.iter) c.sameSeriesChunks = c.sameSeriesChunks[:0] - for iter.Next() { + for c.iter.Next() { c.sameSeriesChunks = append( c.sameSeriesChunks, - newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), iter.At()), + newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), c.iter.At()), ) } - if iter.Err() != nil { - c.chkIterErr = iter.Err() + if c.iter.Err() != nil { + c.chkIterErr = c.iter.Err() return false } return true @@ -262,6 +288,11 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { maxt := int64(math.MinInt64) chks := []chunks.Meta{} + lcsi, existing := it.(*listChunkSeriesIterator) + if existing { + chks = lcsi.chks[:0] + } + i := 0 seriesIter := s.Series.Iterator(nil) lastType := chunkenc.ValNone @@ -323,6 +354,10 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { }) } + if existing { + lcsi.Reset(chks...) + return lcsi + } return NewListChunkSeriesIterator(chks...) } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 6a273a0fd..985a15792 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -503,11 +503,7 @@ func (o mergedOOOChunks) Appender() (chunkenc.Appender, error) { } func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { - iterators := make([]chunkenc.Iterator, 0, len(o.chunks)) - for _, c := range o.chunks { - iterators = append(iterators, c.Chunk.Iterator(nil)) - } - return storage.NewChainSampleIterator(iterators) + return storage.ChainSampleIteratorFromMetas(iterator, o.chunks) } func (o mergedOOOChunks) NumSamples() int { From 085325069558be37b86318162f9dcaafcc83d227 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 15 Dec 2022 18:29:44 +0000 Subject: [PATCH 5/5] Review feedback Signed-off-by: Bryan Boreham --- storage/interface.go | 2 +- storage/series.go | 2 +- tsdb/querier.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/storage/interface.go b/storage/interface.go index 5f0be9db9..3e8dd1086 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -422,7 +422,7 @@ type Labels interface { type SampleIterable interface { // Iterator returns an iterator of the data of the series. - // The iterator passed as argument is for re-use. + // The iterator passed as argument is for re-use, if not nil. // Depending on implementation, the iterator can // be re-used or a new iterator can be allocated. Iterator(chunkenc.Iterator) chunkenc.Iterator diff --git a/storage/series.go b/storage/series.go index 339beb2c9..377c060f7 100644 --- a/storage/series.go +++ b/storage/series.go @@ -287,7 +287,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { mint := int64(math.MaxInt64) maxt := int64(math.MinInt64) - chks := []chunks.Meta{} + var chks []chunks.Meta lcsi, existing := it.(*listChunkSeriesIterator) if existing { chks = lcsi.chks[:0] diff --git a/tsdb/querier.go b/tsdb/querier.go index 642e089aa..89e3d5719 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -426,7 +426,7 @@ func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]strin return r.LabelNamesFor(postings...) } -// These are the things fetched when we move from one series to another. +// seriesData, used inside other iterators, are updated when we move from one series to another. type seriesData struct { chks []chunks.Meta intervals tombstones.Intervals