diff --git a/cmd/promtool/backfill_test.go b/cmd/promtool/backfill_test.go index 398f96766..2c551abeb 100644 --- a/cmd/promtool/backfill_test.go +++ b/cmd/promtool/backfill_test.go @@ -49,7 +49,7 @@ func queryAllSeries(t testing.TB, q storage.Querier, expectedMinTime, expectedMa samples := []backfillSample{} for ss.Next() { series := ss.At() - it := series.Iterator() + it := series.Iterator(nil) require.NoError(t, it.Err()) for it.Next() == chunkenc.ValFloat { ts, v := it.At() diff --git a/cmd/promtool/rules_test.go b/cmd/promtool/rules_test.go index 0e60a20fb..caa930616 100644 --- a/cmd/promtool/rules_test.go +++ b/cmd/promtool/rules_test.go @@ -139,7 +139,7 @@ func TestBackfillRuleIntegration(t *testing.T) { } else { require.Equal(t, 3, len(series.Labels())) } - it := series.Iterator() + it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { samplesCount++ ts, v := it.At() diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 6934fad49..91b97f5c5 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -644,7 +644,7 @@ func dumpSamples(path string, mint, maxt int64) (err error) { for ss.Next() { series := ss.At() lbs := series.Labels() - it := series.Iterator() + it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { ts, val := it.At() fmt.Printf("%s %g %d\n", lbs, val, ts) diff --git a/promql/engine.go b/promql/engine.go index b3ad14b3d..0225f78d2 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1393,10 +1393,12 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { enh := &EvalNodeHelper{Out: make(Vector, 0, 1)} // Process all the calls for one time series at a time. it := storage.NewBuffer(selRange) + var chkIter chunkenc.Iterator for i, s := range selVS.Series { ev.currentSamples -= len(points) points = points[:0] - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) metric := selVS.Series[i].Labels() // The last_over_time function acts like offset; thus, it // should keep the metric name. For all the other range @@ -1578,8 +1580,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { } mat := make(Matrix, 0, len(e.Series)) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + var chkIter chunkenc.Iterator for i, s := range e.Series { - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) ss := Series{ Metric: e.Series[i].Labels(), Points: getPointSlice(numSteps), @@ -1723,8 +1727,10 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect } vec := make(Vector, 0, len(node.Series)) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + var chkIter chunkenc.Iterator for i, s := range node.Series { - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) t, v, h, ok := ev.vectorSelectorSingle(it, node, ts) if ok { @@ -1812,12 +1818,14 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } + var chkIter chunkenc.Iterator series := vs.Series for i, s := range series { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) ss := Series{ Metric: series[i].Labels(), } diff --git a/promql/test_test.go b/promql/test_test.go index 5c16e57a2..c5cb41ed9 100644 --- a/promql/test_test.go +++ b/promql/test_test.go @@ -143,7 +143,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { got := Series{ Metric: storageSeries.Labels(), } - it := storageSeries.Iterator() + it := storageSeries.Iterator(nil) for it.Next() == chunkenc.ValFloat { t, v := it.At() got.Points = append(got.Points, Point{T: t, V: v}) diff --git a/promql/value.go b/promql/value.go index 507a5e6f1..4db976e97 100644 --- a/promql/value.go +++ b/promql/value.go @@ -363,7 +363,11 @@ func (ss *StorageSeries) Labels() labels.Labels { } // Iterator returns a new iterator of the data of the series. -func (ss *StorageSeries) Iterator() chunkenc.Iterator { +func (ss *StorageSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + if ssi, ok := it.(*storageSeriesIterator); ok { + ssi.reset(ss.series) + return ssi + } return newStorageSeriesIterator(ss.series) } @@ -379,6 +383,11 @@ func newStorageSeriesIterator(series Series) *storageSeriesIterator { } } +func (ssi *storageSeriesIterator) reset(series Series) { + ssi.points = series.Points + ssi.curr = -1 +} + func (ssi *storageSeriesIterator) Seek(t int64) chunkenc.ValueType { i := ssi.curr if i < 0 { diff --git a/rules/manager.go b/rules/manager.go index 42f1b59ce..4b9c8150a 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -807,7 +807,7 @@ func (g *Group) RestoreForState(ts time.Time) { // Series found for the 'for' state. var t int64 var v float64 - it := s.Iterator() + it := s.Iterator(nil) for it.Next() == chunkenc.ValFloat { t, v = it.At() } diff --git a/rules/manager_test.go b/rules/manager_test.go index 984bb81b9..5c580caf7 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -592,12 +592,13 @@ func TestStaleness(t *testing.T) { // Convert a SeriesSet into a form usable with require.Equal. func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) { result := map[string][]promql.Point{} + var it chunkenc.Iterator for ss.Next() { series := ss.At() points := []promql.Point{} - it := series.Iterator() + it := series.Iterator(it) for it.Next() == chunkenc.ValFloat { t, v := it.At() points = append(points, promql.Point{T: t, V: v}) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index b22f7f095..bb851bd9e 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -2959,7 +2959,7 @@ func TestScrapeReportSingleAppender(t *testing.T) { c := 0 for series.Next() { - i := series.At().Iterator() + i := series.At().Iterator(nil) for i.Next() != chunkenc.ValNone { c++ } @@ -3032,7 +3032,7 @@ func TestScrapeReportLimit(t *testing.T) { var found bool for series.Next() { - i := series.At().Iterator() + i := series.At().Iterator(nil) for i.Next() == chunkenc.ValFloat { _, v := i.At() require.Equal(t, 1.0, v) diff --git a/storage/fanout_test.go b/storage/fanout_test.go index ee6623397..4996e8f64 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -86,11 +86,12 @@ func TestFanout_SelectSorted(t *testing.T) { result := make(map[int64]float64) var labelsResult labels.Labels + var iterator chunkenc.Iterator for seriesSet.Next() { series := seriesSet.At() seriesLabels := series.Labels() labelsResult = seriesLabels - iterator := series.Iterator() + iterator := series.Iterator(iterator) for iterator.Next() == chunkenc.ValFloat { timestamp, value := iterator.At() result[timestamp] = value @@ -112,11 +113,12 @@ func TestFanout_SelectSorted(t *testing.T) { result := make(map[int64]float64) var labelsResult labels.Labels + var iterator chunkenc.Iterator for seriesSet.Next() { series := seriesSet.At() seriesLabels := series.Labels() labelsResult = seriesLabels - iterator := series.Iterator() + iterator := series.Iterator(iterator) for iterator.Next() == chunkenc.ValFloat { timestamp, value := iterator.At() result[timestamp] = value diff --git a/storage/interface.go b/storage/interface.go index 22d3b4186..3e8dd1086 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -382,7 +382,7 @@ func (s mockSeries) Labels() labels.Labels { return labels.FromStrings(s.labelSet...) } -func (s mockSeries) Iterator() chunkenc.Iterator { +func (s mockSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator { return chunkenc.MockSeriesIterator(s.timestamps, s.values) } @@ -421,14 +421,17 @@ type Labels interface { } type SampleIterable interface { - // Iterator returns a new, independent iterator of the data of the series. - Iterator() chunkenc.Iterator + // Iterator returns an iterator of the data of the series. + // The iterator passed as argument is for re-use, if not nil. + // Depending on implementation, the iterator can + // be re-used or a new iterator can be allocated. + Iterator(chunkenc.Iterator) chunkenc.Iterator } type ChunkIterable interface { - // Iterator returns a new, independent iterator that iterates over potentially overlapping + // Iterator returns an iterator that iterates over potentially overlapping // chunks of the series, sorted by min time. - Iterator() chunks.Iterator + Iterator(chunks.Iterator) chunks.Iterator } type Warnings []error diff --git a/storage/merge.go b/storage/merge.go index 258e4e312..78a0125db 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -425,12 +425,8 @@ func ChainedSeriesMerge(series ...Series) Series { } return &SeriesEntry{ Lset: series[0].Labels(), - SampleIteratorFn: func() chunkenc.Iterator { - iterators := make([]chunkenc.Iterator, 0, len(series)) - for _, s := range series { - iterators = append(iterators, s.Iterator()) - } - return NewChainSampleIterator(iterators) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return ChainSampleIteratorFromSeries(it, series) }, } } @@ -446,15 +442,42 @@ type chainSampleIterator struct { lastT int64 } -// NewChainSampleIterator returns a single iterator that iterates over the samples from the given iterators in a sorted -// fashion. If samples overlap, one sample from overlapped ones is kept (randomly) and all others with the same -// timestamp are dropped. -func NewChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator { - return &chainSampleIterator{ - iterators: iterators, - h: nil, - lastT: math.MinInt64, +// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible. +func getChainSampleIterator(it chunkenc.Iterator, length int) *chainSampleIterator { + csi, ok := it.(*chainSampleIterator) + if !ok { + csi = &chainSampleIterator{} } + if cap(csi.iterators) < length { + csi.iterators = make([]chunkenc.Iterator, length) + } else { + csi.iterators = csi.iterators[:length] + } + csi.h = nil + csi.lastT = math.MinInt64 + return csi +} + +func ChainSampleIteratorFromSeries(it chunkenc.Iterator, series []Series) chunkenc.Iterator { + csi := getChainSampleIterator(it, len(series)) + for i, s := range series { + csi.iterators[i] = s.Iterator(csi.iterators[i]) + } + return csi +} + +func ChainSampleIteratorFromMetas(it chunkenc.Iterator, chunks []chunks.Meta) chunkenc.Iterator { + csi := getChainSampleIterator(it, len(chunks)) + for i, c := range chunks { + csi.iterators[i] = c.Chunk.Iterator(csi.iterators[i]) + } + return csi +} + +func ChainSampleIteratorFromIterators(it chunkenc.Iterator, iterators []chunkenc.Iterator) chunkenc.Iterator { + csi := getChainSampleIterator(it, 0) + csi.iterators = iterators + return csi } func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType { @@ -607,10 +630,10 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC } return &ChunkSeriesEntry{ Lset: series[0].Labels(), - ChunkIteratorFn: func() chunks.Iterator { + ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { iterators := make([]chunks.Iterator, 0, len(series)) for _, s := range series { - iterators = append(iterators, s.Iterator()) + iterators = append(iterators, s.Iterator(nil)) } return &compactChunkIterator{ mergeFunc: mergeFunc, @@ -693,7 +716,7 @@ func (c *compactChunkIterator) Next() bool { } // Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here. - iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator() + iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator(nil) if !iter.Next() { if c.err = iter.Err(); c.err != nil { return false @@ -751,10 +774,10 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc { } return &ChunkSeriesEntry{ Lset: series[0].Labels(), - ChunkIteratorFn: func() chunks.Iterator { + ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { iterators := make([]chunks.Iterator, 0, len(series)) for _, s := range series { - iterators = append(iterators, s.Iterator()) + iterators = append(iterators, s.Iterator(nil)) } return &concatenatingChunkIterator{ iterators: iterators, diff --git a/storage/merge_test.go b/storage/merge_test.go index 726296dd5..ad68684c0 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -202,8 +202,8 @@ func TestMergeQuerierWithChainMerger(t *testing.T) { expectedSeries := tc.expected.At() require.Equal(t, expectedSeries.Labels(), actualSeries.Labels()) - expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(), nil) - actSmpl, actErr := ExpandSamples(actualSeries.Iterator(), nil) + expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(nil), nil) + actSmpl, actErr := ExpandSamples(actualSeries.Iterator(nil), nil) require.Equal(t, expErr, actErr) require.Equal(t, expSmpl, actSmpl) } @@ -370,8 +370,8 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { expectedSeries := tc.expected.At() require.Equal(t, expectedSeries.Labels(), actualSeries.Labels()) - expChks, expErr := ExpandChunks(expectedSeries.Iterator()) - actChks, actErr := ExpandChunks(actualSeries.Iterator()) + expChks, expErr := ExpandChunks(expectedSeries.Iterator(nil)) + actChks, actErr := ExpandChunks(actualSeries.Iterator(nil)) require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) @@ -533,8 +533,8 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) require.Equal(t, tc.expected.Labels(), merged.Labels()) - actChks, actErr := ExpandChunks(merged.Iterator()) - expChks, expErr := ExpandChunks(tc.expected.Iterator()) + actChks, actErr := ExpandChunks(merged.Iterator(nil)) + expChks, expErr := ExpandChunks(tc.expected.Iterator(nil)) require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) @@ -667,8 +667,8 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) require.Equal(t, tc.expected.Labels(), merged.Labels()) - actChks, actErr := ExpandChunks(merged.Iterator()) - expChks, expErr := ExpandChunks(tc.expected.Iterator()) + actChks, actErr := ExpandChunks(merged.Iterator(nil)) + expChks, expErr := ExpandChunks(tc.expected.Iterator(nil)) require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) @@ -809,7 +809,7 @@ func TestChainSampleIterator(t *testing.T) { expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}}, }, } { - merged := NewChainSampleIterator(tc.input) + merged := ChainSampleIteratorFromIterators(nil, tc.input) actual, err := ExpandSamples(merged, nil) require.NoError(t, err) require.Equal(t, tc.expected, actual) @@ -855,7 +855,7 @@ func TestChainSampleIteratorSeek(t *testing.T) { expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}}, }, } { - merged := NewChainSampleIterator(tc.input) + merged := ChainSampleIteratorFromIterators(nil, tc.input) actual := []tsdbutil.Sample{} if merged.Seek(tc.seek) == chunkenc.ValFloat { t, v := merged.At() @@ -868,9 +868,7 @@ func TestChainSampleIteratorSeek(t *testing.T) { } } -var result []tsdbutil.Sample - -func makeSeriesSet(numSeries, numSamples int) SeriesSet { +func makeSeries(numSeries, numSamples int) []Series { series := []Series{} for j := 0; j < numSeries; j++ { labels := labels.FromStrings("foo", fmt.Sprintf("bar%d", j)) @@ -880,30 +878,39 @@ func makeSeriesSet(numSeries, numSamples int) SeriesSet { } series = append(series, NewListSeries(labels, samples)) } - return NewMockSeriesSet(series...) + return series } -func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet { - seriesSets := []genericSeriesSet{} - for i := 0; i < numSeriesSets; i++ { - seriesSets = append(seriesSets, &genericSeriesSetAdapter{makeSeriesSet(numSeries, numSamples)}) +func makeMergeSeriesSet(serieses [][]Series) SeriesSet { + seriesSets := make([]genericSeriesSet, len(serieses)) + for i, s := range serieses { + seriesSets[i] = &genericSeriesSetAdapter{NewMockSeriesSet(s...)} } return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)} } -func benchmarkDrain(seriesSet SeriesSet, b *testing.B) { +func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) { var err error + var t int64 + var v float64 + var iter chunkenc.Iterator for n := 0; n < b.N; n++ { + seriesSet := makeSeriesSet() for seriesSet.Next() { - result, err = ExpandSamples(seriesSet.At().Iterator(), nil) - require.NoError(b, err) + iter = seriesSet.At().Iterator(iter) + for iter.Next() == chunkenc.ValFloat { + t, v = iter.At() + } + err = iter.Err() } + require.NoError(b, err) + require.NotEqual(b, t, v) // To ensure the inner loop doesn't get optimised away. } } func BenchmarkNoMergeSeriesSet_100_100(b *testing.B) { - seriesSet := makeSeriesSet(100, 100) - benchmarkDrain(seriesSet, b) + series := makeSeries(100, 100) + benchmarkDrain(b, func() SeriesSet { return NewMockSeriesSet(series...) }) } func BenchmarkMergeSeriesSet(b *testing.B) { @@ -914,9 +921,12 @@ func BenchmarkMergeSeriesSet(b *testing.B) { {10, 100, 100}, {100, 100, 100}, } { - seriesSet := makeMergeSeriesSet(bm.numSeriesSets, bm.numSeries, bm.numSamples) + serieses := [][]Series{} + for i := 0; i < bm.numSeriesSets; i++ { + serieses = append(serieses, makeSeries(bm.numSeries, bm.numSamples)) + } b.Run(fmt.Sprintf("%d_%d_%d", bm.numSeriesSets, bm.numSeries, bm.numSamples), func(b *testing.B) { - benchmarkDrain(seriesSet, b) + benchmarkDrain(b, func() SeriesSet { return makeMergeSeriesSet(serieses) }) }) } } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 48c2d8615..a74ad2b7b 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" ) // decodeReadLimit is the maximum size of a read request body in bytes. @@ -115,9 +116,10 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHi func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, storage.Warnings, error) { numSamples := 0 resp := &prompb.QueryResult{} + var iter chunkenc.Iterator for ss.Next() { series := ss.At() - iter := series.Iterator() + iter = series.Iterator(iter) samples := []prompb.Sample{} for iter.Next() == chunkenc.ValFloat { @@ -199,11 +201,12 @@ func StreamChunkedReadResponses( var ( chks []prompb.Chunk lbls []prompb.Label + iter chunks.Iterator ) for ss.Next() { series := ss.At() - iter := series.Iterator() + iter = series.Iterator(iter) lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels) frameBytesLeft := maxBytesInFrame @@ -346,7 +349,11 @@ func (c *concreteSeries) Labels() labels.Labels { return labels.New(c.labels...) } -func (c *concreteSeries) Iterator() chunkenc.Iterator { +func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + if csi, ok := it.(*concreteSeriesIterator); ok { + csi.reset(c) + return csi + } return newConcreteSeriersIterator(c) } @@ -363,6 +370,11 @@ func newConcreteSeriersIterator(series *concreteSeries) chunkenc.Iterator { } } +func (c *concreteSeriesIterator) reset(series *concreteSeries) { + c.cur = -1 + c.series = series +} + // Seek implements storage.SeriesIterator. func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { if c.cur == -1 { diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index c806097c6..596eb0861 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -215,7 +215,7 @@ func TestConcreteSeriesIterator(t *testing.T) { {Value: 4, Timestamp: 4}, }, } - it := series.Iterator() + it := series.Iterator(nil) // Seek to the first sample with ts=1. require.Equal(t, chunkenc.ValFloat, it.Seek(1)) diff --git a/storage/series.go b/storage/series.go index 3259dd4d0..377c060f7 100644 --- a/storage/series.go +++ b/storage/series.go @@ -27,26 +27,31 @@ import ( type SeriesEntry struct { Lset labels.Labels - SampleIteratorFn func() chunkenc.Iterator + SampleIteratorFn func(chunkenc.Iterator) chunkenc.Iterator } -func (s *SeriesEntry) Labels() labels.Labels { return s.Lset } -func (s *SeriesEntry) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() } +func (s *SeriesEntry) Labels() labels.Labels { return s.Lset } +func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return s.SampleIteratorFn(it) } type ChunkSeriesEntry struct { Lset labels.Labels - ChunkIteratorFn func() chunks.Iterator + ChunkIteratorFn func(chunks.Iterator) chunks.Iterator } -func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset } -func (s *ChunkSeriesEntry) Iterator() chunks.Iterator { return s.ChunkIteratorFn() } +func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset } +func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) } // NewListSeries returns series entry with iterator that allows to iterate over provided samples. func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { + samplesS := Samples(samples(s)) return &SeriesEntry{ Lset: lset, - SampleIteratorFn: func() chunkenc.Iterator { - return NewListSeriesIterator(samples(s)) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + if lsi, ok := it.(*listSeriesIterator); ok { + lsi.Reset(samplesS) + return lsi + } + return NewListSeriesIterator(samplesS) }, } } @@ -56,11 +61,21 @@ func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sample) *ChunkSeriesEntry { return &ChunkSeriesEntry{ Lset: lset, - ChunkIteratorFn: func() chunks.Iterator { - chks := make([]chunks.Meta, 0, len(samples)) + ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator { + lcsi, existing := it.(*listChunkSeriesIterator) + var chks []chunks.Meta + if existing { + chks = lcsi.chks[:0] + } else { + chks = make([]chunks.Meta, 0, len(samples)) + } for _, s := range samples { chks = append(chks, tsdbutil.ChunkFromSamples(s)) } + if existing { + lcsi.Reset(chks...) + return lcsi + } return NewListChunkSeriesIterator(chks...) }, } @@ -87,6 +102,11 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator { return &listSeriesIterator{samples: samples, idx: -1} } +func (it *listSeriesIterator) Reset(samples Samples) { + it.samples = samples + it.idx = -1 +} + func (it *listSeriesIterator) At() (int64, float64) { s := it.samples.Get(it.idx) return s.T(), s.V() @@ -150,6 +170,11 @@ func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator { return &listChunkSeriesIterator{chks: chks, idx: -1} } +func (it *listChunkSeriesIterator) Reset(chks ...chunks.Meta) { + it.chks = chks + it.idx = -1 +} + func (it *listChunkSeriesIterator) At() chunks.Meta { return it.chks[it.idx] } @@ -164,6 +189,7 @@ func (it *listChunkSeriesIterator) Err() error { return nil } type chunkSetToSeriesSet struct { ChunkSeriesSet + iter chunks.Iterator chkIterErr error sameSeriesChunks []Series } @@ -178,18 +204,18 @@ func (c *chunkSetToSeriesSet) Next() bool { return false } - iter := c.ChunkSeriesSet.At().Iterator() + c.iter = c.ChunkSeriesSet.At().Iterator(c.iter) c.sameSeriesChunks = c.sameSeriesChunks[:0] - for iter.Next() { + for c.iter.Next() { c.sameSeriesChunks = append( c.sameSeriesChunks, - newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), iter.At()), + newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), c.iter.At()), ) } - if iter.Err() != nil { - c.chkIterErr = iter.Err() + if c.iter.Err() != nil { + c.chkIterErr = c.iter.Err() return false } return true @@ -210,9 +236,9 @@ func (c *chunkSetToSeriesSet) Err() error { func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series { return &SeriesEntry{ Lset: labels, - SampleIteratorFn: func() chunkenc.Iterator { + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { // TODO(bwplotka): Can we provide any chunkenc buffer? - return chk.Chunk.Iterator(nil) + return chk.Chunk.Iterator(it) }, } } @@ -252,7 +278,7 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries { return &seriesToChunkEncoder{series} } -func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { +func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { var ( chk chunkenc.Chunk app chunkenc.Appender @@ -261,9 +287,14 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { mint := int64(math.MaxInt64) maxt := int64(math.MinInt64) - chks := []chunks.Meta{} + var chks []chunks.Meta + lcsi, existing := it.(*listChunkSeriesIterator) + if existing { + chks = lcsi.chks[:0] + } + i := 0 - seriesIter := s.Series.Iterator() + seriesIter := s.Series.Iterator(nil) lastType := chunkenc.ValNone for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { if typ != lastType || i >= seriesToChunkEncoderSplit { @@ -323,6 +354,10 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { }) } + if existing { + lcsi.Reset(chks...) + return lcsi + } return NewListChunkSeriesIterator(chks...) } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 6cb00b348..c3a6ff576 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -203,7 +203,7 @@ func TestCorruptedChunk(t *testing.T) { // Check chunk errors during iter time. require.True(t, set.Next()) - it := set.At().Iterator() + it := set.At().Iterator(nil) require.Equal(t, chunkenc.ValNone, it.Next()) require.Equal(t, tc.iterErr.Error(), it.Err().Error()) }) @@ -505,11 +505,12 @@ func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir str head, err := NewHead(nil, nil, w, nil, opts, nil) require.NoError(tb, err) + var it chunkenc.Iterator ctx := context.Background() app := head.Appender(ctx) for _, s := range series { ref := storage.SeriesRef(0) - it := s.Iterator() + it = s.Iterator(it) lset := s.Labels() typ := it.Next() lastTyp := typ @@ -550,11 +551,12 @@ func createHeadWithOOOSamples(tb testing.TB, w *wlog.WL, series []storage.Series oooSampleLabels := make([]labels.Labels, 0, len(series)) oooSamples := make([]tsdbutil.SampleSlice, 0, len(series)) + var it chunkenc.Iterator totalSamples := 0 app := head.Appender(context.Background()) for _, s := range series { ref := storage.SeriesRef(0) - it := s.Iterator() + it = s.Iterator(it) lset := s.Labels() os := tsdbutil.SampleSlice{} count := 0 diff --git a/tsdb/compact.go b/tsdb/compact.go index 9fe50fda1..f216ad46a 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -746,8 +746,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } var ( - ref = storage.SeriesRef(0) - chks []chunks.Meta + ref = storage.SeriesRef(0) + chks []chunks.Meta + chksIter chunks.Iterator ) set := sets[0] @@ -765,7 +766,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, default: } s := set.At() - chksIter := s.Iterator() + chksIter = s.Iterator(chksIter) chks = chks[:0] for chksIter.Next() { // We are not iterating in streaming way over chunk as diff --git a/tsdb/db_test.go b/tsdb/db_test.go index d4c2840c2..cea4b6e36 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -93,12 +93,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str require.NoError(t, q.Close()) }() + var it chunkenc.Iterator result := map[string][]tsdbutil.Sample{} for ss.Next() { series := ss.At() samples := []tsdbutil.Sample{} - it := series.Iterator() + it = series.Iterator(it) for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { switch typ { case chunkenc.ValFloat: @@ -133,12 +134,13 @@ func queryChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Match require.NoError(t, q.Close()) }() + var it chunks.Iterator result := map[string][]chunks.Meta{} for ss.Next() { series := ss.At() chks := []chunks.Meta{} - it := series.Iterator() + it = series.Iterator(it) for it.Next() { chks = append(chks, it.At()) } @@ -454,8 +456,8 @@ Outer: require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -628,9 +630,10 @@ func TestDB_Snapshot(t *testing.T) { // sum values seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { _, v := series.At() sum += v @@ -676,9 +679,10 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { // Sum values. seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { _, v := series.At() sum += v @@ -770,8 +774,8 @@ Outer: require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -921,7 +925,7 @@ func TestDB_e2e(t *testing.T) { for ss.Next() { x := ss.At() - smpls, err := storage.ExpandSamples(x.Iterator(), newSample) + smpls, err := storage.ExpandSamples(x.Iterator(nil), newSample) require.NoError(t, err) if len(smpls) > 0 { @@ -1108,12 +1112,13 @@ func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBefore set := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+")) actualSeries := 0 + var chunksIt chunks.Iterator for set.Next() { actualSeries++ actualChunks := 0 - chunksIt := set.At().Iterator() + chunksIt = set.At().Iterator(chunksIt) for chunksIt.Next() { actualChunks++ } @@ -1205,8 +1210,8 @@ func TestTombstoneClean(t *testing.T) { require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -1479,11 +1484,12 @@ func TestSizeRetention(t *testing.T) { // Add some data to the WAL. headApp := db.Head().Appender(context.Background()) var aSeries labels.Labels + var it chunkenc.Iterator for _, m := range headBlocks { series := genSeries(100, 10, m.MinTime, m.MaxTime+1) for _, s := range series { aSeries = s.Labels() - it := s.Iterator() + it = s.Iterator(it) for it.Next() == chunkenc.ValFloat { tim, v := it.At() _, err := headApp.Append(0, s.Labels(), tim, v) @@ -1691,10 +1697,11 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample, storage.Warnings, error) { resultLabels := []labels.Labels{} resultSamples := map[string][]sample{} + var it chunkenc.Iterator for ss.Next() { series := ss.At() samples := []sample{} - it := series.Iterator() + it = series.Iterator(it) for it.Next() == chunkenc.ValFloat { t, v := it.At() samples = append(samples, sample{t: t, v: v}) @@ -2500,10 +2507,11 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { // Sum the values. seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) + var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { _, v := series.At() sum += v @@ -2946,10 +2954,11 @@ func TestCompactHead(t *testing.T) { defer func() { require.NoError(t, querier.Close()) }() seriesSet := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"}) + var series chunkenc.Iterator var actSamples []sample for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { time, val := series.At() actSamples = append(actSamples, sample{int64(time), val, nil, nil}) @@ -3347,7 +3356,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t actualSeries++ // Get the iterator and call Next() so that we're sure the chunk is loaded. - it := seriesSet.At().Iterator() + it := seriesSet.At().Iterator(nil) it.Next() it.At() @@ -3477,11 +3486,13 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+")) // Iterate all series and get their chunks. + var it chunks.Iterator var chunks []chunkenc.Chunk actualSeries := 0 for seriesSet.Next() { actualSeries++ - for it := seriesSet.At().Iterator(); it.Next(); { + it = seriesSet.At().Iterator(it) + for it.Next() { chunks = append(chunks, it.At().Chunk) } } @@ -6025,13 +6036,14 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { ctx := context.Background() + var it chunkenc.Iterator exp := make(map[string][]tsdbutil.Sample) for _, series := range blockSeries { createBlock(t, db.Dir(), series) for _, s := range series { key := s.Labels().String() - it := s.Iterator() + it = s.Iterator(it) slice := exp[key] for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { switch typ { diff --git a/tsdb/example_test.go b/tsdb/example_test.go index c33bf6dc0..da0e37923 100644 --- a/tsdb/example_test.go +++ b/tsdb/example_test.go @@ -67,7 +67,7 @@ func Example() { series := ss.At() fmt.Println("series:", series.Labels().String()) - it := series.Iterator() + it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { _, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below) fmt.Println("sample", v) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 6a273a0fd..985a15792 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -503,11 +503,7 @@ func (o mergedOOOChunks) Appender() (chunkenc.Appender, error) { } func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { - iterators := make([]chunkenc.Iterator, 0, len(o.chunks)) - for _, c := range o.chunks { - iterators = append(iterators, c.Chunk.Iterator(nil)) - } - return storage.NewChainSampleIterator(iterators) + return storage.ChainSampleIteratorFromMetas(iterator, o.chunks) } func (o mergedOOOChunks) NumSamples() int { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 9b8eb0278..59824ae08 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -924,8 +924,8 @@ func TestHeadDeleteSimple(t *testing.T) { require.Equal(t, expSeries.Labels(), actSeries.Labels()) - smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -959,7 +959,7 @@ func TestDeleteUntilCurMax(t *testing.T) { res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, res.Next(), "series is not present") s := res.At() - it := s.Iterator() + it := s.Iterator(nil) require.Equal(t, chunkenc.ValNone, it.Next(), "expected no samples") for res.Next() { } @@ -976,7 +976,7 @@ func TestDeleteUntilCurMax(t *testing.T) { res = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, res.Next(), "series don't exist") exps := res.At() - it = exps.Iterator() + it = exps.Iterator(nil) resSamples, err := storage.ExpandSamples(it, newSample) require.NoError(t, err) require.Equal(t, []tsdbutil.Sample{sample{11, 1, nil, nil}}, resSamples) @@ -1163,7 +1163,7 @@ func TestDelete_e2e(t *testing.T) { eok, rok := expSs.Next(), ss.Next() // Skip a series if iterator is empty. if rok { - for ss.At().Iterator().Next() == chunkenc.ValNone { + for ss.At().Iterator(nil).Next() == chunkenc.ValNone { rok = ss.Next() if !rok { break @@ -1177,8 +1177,8 @@ func TestDelete_e2e(t *testing.T) { sexp := expSs.At() sres := ss.At() require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) } @@ -2635,7 +2635,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { <-time.After(3 * time.Second) // Now consume after compaction when it's gone. - it := s.Iterator() + it := s.Iterator(nil) for it.Next() == chunkenc.ValFloat { _, _ = it.At() } @@ -2643,7 +2643,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { require.NoError(t, it.Err()) for ss.Next() { s = ss.At() - it := s.Iterator() + it = s.Iterator(it) for it.Next() == chunkenc.ValFloat { _, _ = it.At() } @@ -2841,7 +2841,7 @@ func TestAppendHistogram(t *testing.T) { s := ss.At() require.False(t, ss.Next()) - it := s.Iterator() + it := s.Iterator(nil) actHistograms := make([]timedHistogram, 0, len(expHistograms)) for it.Next() == chunkenc.ValHistogram { t, h := it.AtHistogram() @@ -3304,7 +3304,7 @@ func TestHistogramStaleSample(t *testing.T) { s := ss.At() require.False(t, ss.Next()) - it := s.Iterator() + it := s.Iterator(nil) actHistograms := make([]timedHistogram, 0, len(expHistograms)) for it.Next() == chunkenc.ValHistogram { t, h := it.AtHistogram() @@ -3581,7 +3581,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, ss.Next()) s := ss.At() - it := s.Iterator() + it := s.Iterator(nil) expIdx := 0 loop: for { diff --git a/tsdb/querier.go b/tsdb/querier.go index cc765903c..89e3d5719 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -426,6 +426,16 @@ func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]strin return r.LabelNamesFor(postings...) } +// seriesData, used inside other iterators, are updated when we move from one series to another. +type seriesData struct { + chks []chunks.Meta + intervals tombstones.Intervals + labels labels.Labels +} + +// Labels implements part of storage.Series and storage.ChunkSeries. +func (s *seriesData) Labels() labels.Labels { return s.labels } + // blockBaseSeriesSet allows to iterate over all series in the single block. // Iterated series are trimmed with given min and max time as well as tombstones. // See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating. @@ -438,8 +448,7 @@ type blockBaseSeriesSet struct { mint, maxt int64 disableTrimming bool - currIterFn func() *populateWithDelGenericSeriesIterator - currLabels labels.Labels + curr seriesData bufChks []chunks.Meta bufLbls labels.Labels @@ -519,12 +528,11 @@ func (b *blockBaseSeriesSet) Next() bool { intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64}) } - b.currLabels = make(labels.Labels, len(b.bufLbls)) - copy(b.currLabels, b.bufLbls) + b.curr.labels = make(labels.Labels, len(b.bufLbls)) + copy(b.curr.labels, b.bufLbls) + b.curr.chks = chks + b.curr.intervals = intervals - b.currIterFn = func() *populateWithDelGenericSeriesIterator { - return newPopulateWithDelGenericSeriesIterator(b.blockID, b.chunks, chks, intervals) - } return true } return false @@ -556,29 +564,26 @@ type populateWithDelGenericSeriesIterator struct { // the same, single series. chks []chunks.Meta - i int + i int // Index into chks; -1 if not started yet. err error - bufIter *DeletedIterator + bufIter DeletedIterator // Retained for memory re-use. currDelIter may point here. intervals tombstones.Intervals currDelIter chunkenc.Iterator currChkMeta chunks.Meta } -func newPopulateWithDelGenericSeriesIterator( - blockID ulid.ULID, - chunks ChunkReader, - chks []chunks.Meta, - intervals tombstones.Intervals, -) *populateWithDelGenericSeriesIterator { - return &populateWithDelGenericSeriesIterator{ - blockID: blockID, - chunks: chunks, - chks: chks, - i: -1, - bufIter: &DeletedIterator{}, - intervals: intervals, - } +func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) { + p.blockID = blockID + p.chunks = cr + p.chks = chks + p.i = -1 + p.err = nil + p.bufIter.Iter = nil + p.bufIter.Intervals = p.bufIter.Intervals[:0] + p.intervals = intervals + p.currDelIter = nil + p.currChkMeta = chunks.Meta{} } func (p *populateWithDelGenericSeriesIterator) next() bool { @@ -618,28 +623,55 @@ func (p *populateWithDelGenericSeriesIterator) next() bool { // We don't want the full chunk, or it's potentially still opened, take // just a part of it. - p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil) - p.currDelIter = p.bufIter + p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(p.bufIter.Iter) + p.currDelIter = &p.bufIter return true } func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err } -func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator { - return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p} +type blockSeriesEntry struct { + chunks ChunkReader + blockID ulid.ULID + seriesData } -func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator { - return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p} +func (s *blockSeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + pi, ok := it.(*populateWithDelSeriesIterator) + if !ok { + pi = &populateWithDelSeriesIterator{} + } + pi.reset(s.blockID, s.chunks, s.chks, s.intervals) + return pi +} + +type chunkSeriesEntry struct { + chunks ChunkReader + blockID ulid.ULID + seriesData +} + +func (s *chunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { + pi, ok := it.(*populateWithDelChunkSeriesIterator) + if !ok { + pi = &populateWithDelChunkSeriesIterator{} + } + pi.reset(s.blockID, s.chunks, s.chks, s.intervals) + return pi } // populateWithDelSeriesIterator allows to iterate over samples for the single series. type populateWithDelSeriesIterator struct { - *populateWithDelGenericSeriesIterator + populateWithDelGenericSeriesIterator curr chunkenc.Iterator } +func (p *populateWithDelSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) { + p.populateWithDelGenericSeriesIterator.reset(blockID, cr, chks, intervals) + p.curr = nil +} + func (p *populateWithDelSeriesIterator) Next() chunkenc.ValueType { if p.curr != nil { if valueType := p.curr.Next(); valueType != chunkenc.ValNone { @@ -701,11 +733,16 @@ func (p *populateWithDelSeriesIterator) Err() error { } type populateWithDelChunkSeriesIterator struct { - *populateWithDelGenericSeriesIterator + populateWithDelGenericSeriesIterator curr chunks.Meta } +func (p *populateWithDelChunkSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) { + p.populateWithDelGenericSeriesIterator.reset(blockID, cr, chks, intervals) + p.curr = chunks.Meta{} +} + func (p *populateWithDelChunkSeriesIterator) Next() bool { if !p.next() { return false @@ -834,13 +871,11 @@ func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p inde } func (b *blockSeriesSet) At() storage.Series { - // At can be looped over before iterating, so save the current value locally. - currIterFn := b.currIterFn - return &storage.SeriesEntry{ - Lset: b.currLabels, - SampleIteratorFn: func() chunkenc.Iterator { - return currIterFn().toSeriesIterator() - }, + // At can be looped over before iterating, so save the current values locally. + return &blockSeriesEntry{ + chunks: b.chunks, + blockID: b.blockID, + seriesData: b.curr, } } @@ -868,13 +903,11 @@ func newBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombst } func (b *blockChunkSeriesSet) At() storage.ChunkSeries { - // At can be looped over before iterating, so save the current value locally. - currIterFn := b.currIterFn - return &storage.ChunkSeriesEntry{ - Lset: b.currLabels, - ChunkIteratorFn: func() chunks.Iterator { - return currIterFn().toChunkSeriesIterator() - }, + // At can be looped over before iterating, so save the current values locally. + return &chunkSeriesEntry{ + chunks: b.chunks, + blockID: b.blockID, + seriesData: b.curr, } } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index ffb24b17b..3b44cef51 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -194,8 +194,8 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C sres := res.At() require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -230,9 +230,9 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C require.Equal(t, sexpChks.Labels(), sres.Labels()) - chksExp, errExp := storage.ExpandChunks(sexpChks.Iterator()) + chksExp, errExp := storage.ExpandChunks(sexpChks.Iterator(nil)) rmChunkRefs(chksExp) - chksRes, errRes := storage.ExpandChunks(sres.Iterator()) + chksRes, errRes := storage.ExpandChunks(sres.Iterator(nil)) rmChunkRefs(chksRes) require.Equal(t, errExp, errRes) require.Equal(t, chksExp, chksRes) @@ -859,7 +859,8 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Run("sample", func(t *testing.T) { f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...) - it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, tc.intervals).toSeriesIterator() + it := &populateWithDelSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, tc.intervals) var r []tsdbutil.Sample if tc.seek != 0 { @@ -879,7 +880,8 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }) t.Run("chunk", func(t *testing.T) { f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...) - it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, tc.intervals).toChunkSeriesIterator() + it := &populateWithDelChunkSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, tc.intervals) if tc.seek != 0 { // Chunk iterator does not have Seek method. @@ -911,7 +913,8 @@ func TestPopulateWithDelSeriesIterator_DoubleSeek(t *testing.T) { []tsdbutil.Sample{sample{4, 4, nil, nil}, sample{5, 5, nil, nil}}, ) - it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator() + it := &populateWithDelSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, nil) require.Equal(t, chunkenc.ValFloat, it.Seek(1)) require.Equal(t, chunkenc.ValFloat, it.Seek(2)) require.Equal(t, chunkenc.ValFloat, it.Seek(2)) @@ -929,7 +932,8 @@ func TestPopulateWithDelSeriesIterator_SeekInCurrentChunk(t *testing.T) { []tsdbutil.Sample{}, ) - it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator() + it := &populateWithDelSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, nil) require.Equal(t, chunkenc.ValFloat, it.Next()) ts, v := it.At() require.Equal(t, int64(1), ts) @@ -946,7 +950,8 @@ func TestPopulateWithDelSeriesIterator_SeekWithMinTime(t *testing.T) { []tsdbutil.Sample{sample{1, 6, nil, nil}, sample{5, 6, nil, nil}, sample{6, 8, nil, nil}}, ) - it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator() + it := &populateWithDelSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, nil) require.Equal(t, chunkenc.ValNone, it.Seek(7)) require.Equal(t, chunkenc.ValFloat, it.Seek(3)) } @@ -958,9 +963,8 @@ func TestPopulateWithDelSeriesIterator_NextWithMinTime(t *testing.T) { []tsdbutil.Sample{sample{1, 6, nil, nil}, sample{5, 6, nil, nil}, sample{7, 8, nil, nil}}, ) - it := newPopulateWithDelGenericSeriesIterator( - ulid.ULID{}, f, chkMetas, tombstones.Intervals{{Mint: math.MinInt64, Maxt: 2}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64}), - ).toSeriesIterator() + it := &populateWithDelSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, tombstones.Intervals{{Mint: math.MinInt64, Maxt: 2}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64})) require.Equal(t, chunkenc.ValNone, it.Next()) } @@ -1433,9 +1437,10 @@ func BenchmarkQuerySeek(b *testing.B) { b.ResetTimer() b.ReportAllocs() + var it chunkenc.Iterator ss := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) for ss.Next() { - it := ss.At().Iterator() + it = ss.At().Iterator(it) for t := mint; t <= maxt; t++ { it.Seek(t) } @@ -2042,11 +2047,13 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la for i := 0; i < b.N; i++ { ss := q.Select(false, nil, selectors...) var actualExpansions int + var it chunkenc.Iterator for ss.Next() { s := ss.At() s.Labels() - it := s.Iterator() + it = s.Iterator(it) for it.Next() != chunkenc.ValNone { + _, _ = it.At() } actualExpansions++ } @@ -2222,11 +2229,12 @@ func TestBlockBaseSeriesSet(t *testing.T) { i := 0 for bcs.Next() { - chks := bcs.currIterFn().chks + si := populateWithDelGenericSeriesIterator{} + si.reset(bcs.blockID, bcs.chunks, bcs.curr.chks, bcs.curr.intervals) idx := tc.expIdxs[i] - require.Equal(t, tc.series[idx].lset, bcs.currLabels) - require.Equal(t, tc.series[idx].chunks, chks) + require.Equal(t, tc.series[idx].lset, bcs.curr.labels) + require.Equal(t, tc.series[idx].chunks, si.chks) i++ } diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 777db5e90..8117f431c 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -49,10 +49,11 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l const commitAfter = 10000 ctx := context.Background() app := w.Appender(ctx) + var it chunkenc.Iterator for _, s := range series { ref := storage.SeriesRef(0) - it := s.Iterator() + it = s.Iterator(it) lset := s.Labels() typ := it.Next() lastTyp := typ diff --git a/web/federate.go b/web/federate.go index 5ba68fa28..baa3b5866 100644 --- a/web/federate.go +++ b/web/federate.go @@ -102,12 +102,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) it := storage.NewBuffer(int64(h.lookbackDelta / 1e6)) + var chkIter chunkenc.Iterator for set.Next() { s := set.At() // TODO(fabxc): allow fast path for most recent sample either // in the storage itself or caching layer in Prometheus. - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) var t int64 var v float64