From 8b84856ba5895a9a24e2176a200342b6f2072beb Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 5 Jul 2023 15:29:00 +1000 Subject: [PATCH] Add EstimatedChunkCount() method to ChunkSeries Signed-off-by: Charles Korn --- storage/interface.go | 6 ++ storage/merge.go | 87 ++++++++++++++++++++++++++++ storage/merge_test.go | 18 +++++- storage/series.go | 38 +++++++++++++ storage/series_test.go | 126 ++++++++++++++++++++++++++++++++++++++++- tsdb/querier.go | 4 ++ 6 files changed, 273 insertions(+), 6 deletions(-) diff --git a/storage/interface.go b/storage/interface.go index 2a440cc93c..158c2354b8 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -415,6 +415,12 @@ type ChunkSeriesSet interface { type ChunkSeries interface { Labels ChunkIterable + + // EstimatedChunkCount returns an estimate of the number of chunks available from this ChunkSeries. + // + // This estimate is used by Mimir's ingesters to report the number of chunks expected to be returned by a query, + // which is used by queriers to enforce the 'max chunks per query' limit. + EstimatedChunkCount() int } // Labels represents an item that has labels e.g. time series. diff --git a/storage/merge.go b/storage/merge.go index c0665d720b..61f873cf36 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -658,6 +658,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC if len(series) == 0 { return nil } + return &ChunkSeriesEntry{ Lset: series[0].Labels(), ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { @@ -670,10 +671,86 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC iterators: iterators, } }, + ChunkCountFn: func() int { + return estimateCompactedChunkCount(series) + }, } } } +// estimateCompactedChunkCount computes an estimate of the resulting number of chunks +// after compacting series. +// +// The estimate is imperfect in a few ways, due to the fact it does not examine individual samples: +// - it does not check for duplicate samples in overlapping chunks, and so may overestimate +// the resulting number of chunks if duplicate samples are present, or if an entire chunk is +// duplicated +// - it does not check if overlapping chunks have samples that swap back and forth between +// different encodings over time, and so may underestimate the resulting number of chunks +func estimateCompactedChunkCount(series []ChunkSeries) int { + h := make(chunkIteratorHeap, 0, len(series)) + + for _, s := range series { + iter := s.Iterator(nil) + if iter.Next() { + h.Push(iter) + } + } + + totalChunkCount := 0 + + for len(h) > 0 { + iter := heap.Pop(&h).(chunks.Iterator) + prev := iter.At() + if iter.Next() { + heap.Push(&h, iter) + } + + chunkCountForThisTimePeriod := 0 + sampleCount := prev.Chunk.NumSamples() + maxTime := prev.MaxTime + + // Find all overlapping chunks and estimate the number of resulting chunks. + for len(h) > 0 && h[0].At().MinTime <= maxTime { + iter := heap.Pop(&h).(chunks.Iterator) + next := iter.At() + if iter.Next() { + heap.Push(&h, iter) + } + + if next.MaxTime > maxTime { + maxTime = next.MaxTime + } + + if prev.Chunk.Encoding() != next.Chunk.Encoding() { + // If we have more than seriesToChunkEncoderSplit samples, account for the additional chunks we'll create. + chunkCountForThisTimePeriod += sampleCount / seriesToChunkEncoderSplit + if sampleCount%seriesToChunkEncoderSplit > 0 { + chunkCountForThisTimePeriod++ + } + + sampleCount = 0 + } + + sampleCount += next.Chunk.NumSamples() + prev = next + } + + // If we have more than seriesToChunkEncoderSplit samples, account for the additional chunks we'll create. + chunkCountForThisTimePeriod += sampleCount / seriesToChunkEncoderSplit + if sampleCount%seriesToChunkEncoderSplit > 0 { + chunkCountForThisTimePeriod++ + } + if chunkCountForThisTimePeriod == 0 { + chunkCountForThisTimePeriod = 1 // We'll always create at least one chunk. + } + + totalChunkCount += chunkCountForThisTimePeriod + } + + return totalChunkCount +} + // compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries. // If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk. // TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 @@ -801,6 +878,7 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc { if len(series) == 0 { return nil } + return &ChunkSeriesEntry{ Lset: series[0].Labels(), ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { @@ -812,6 +890,15 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc { iterators: iterators, } }, + ChunkCountFn: func() int { + chunkCount := 0 + + for _, series := range series { + chunkCount += series.EstimatedChunkCount() + } + + return chunkCount + }, } } } diff --git a/storage/merge_test.go b/storage/merge_test.go index b0544c2d81..98daed10c5 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -394,9 +394,10 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { } for _, tc := range []struct { - name string - input []ChunkSeries - expected ChunkSeries + name string + input []ChunkSeries + expected ChunkSeries + expectedChunksEstimate int }{ { name: "single empty series", @@ -483,6 +484,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 110), ), + expectedChunksEstimate: 2, // Estimation doesn't consider duplicate series when estimating the number of chunks. }, { name: "150 overlapping samples, split chunk", @@ -520,6 +522,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { []tsdbutil.Sample{fSample{12, 12}, fSample{14, 14}}, []tsdbutil.Sample{histogramSample(15)}, ), + expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings. }, { name: "float histogram chunks overlapping", @@ -546,6 +549,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { []tsdbutil.Sample{fSample{12, 12}, fSample{14, 14}}, []tsdbutil.Sample{floatHistogramSample(15)}, ), + expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings. }, { name: "float histogram chunks overlapping with histogram chunks", @@ -560,6 +564,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { []tsdbutil.Sample{histogramSample(12), histogramSample(14)}, []tsdbutil.Sample{floatHistogramSample(15)}, ), + expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings. }, } { t.Run(tc.name, func(t *testing.T) { @@ -570,6 +575,12 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) + + if tc.expectedChunksEstimate == 0 { + tc.expectedChunksEstimate = len(actChks) + } + + require.Equalf(t, tc.expectedChunksEstimate, merged.EstimatedChunkCount(), "expected estimate of %v chunks, actual chunks are: %v", tc.expectedChunksEstimate, actChks) }) } } @@ -704,6 +715,7 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) { require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) + require.Equal(t, len(expChks), merged.EstimatedChunkCount()) }) } } diff --git a/storage/series.go b/storage/series.go index b73f1e35ce..7d0b8a5193 100644 --- a/storage/series.go +++ b/storage/series.go @@ -35,11 +35,13 @@ func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return type ChunkSeriesEntry struct { Lset labels.Labels + ChunkCountFn func() int ChunkIteratorFn func(chunks.Iterator) chunks.Iterator } func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset } func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) } +func (s *ChunkSeriesEntry) EstimatedChunkCount() int { return s.ChunkCountFn() } // NewListSeries returns series entry with iterator that allows to iterate over provided samples. func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { @@ -78,6 +80,7 @@ func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sam } return NewListChunkSeriesIterator(chks...) }, + ChunkCountFn: func() int { return len(samples) }, // We create one chunk per slice of samples. } } @@ -399,6 +402,41 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { return NewListChunkSeriesIterator(chks...) } +// EstimatedChunkCount returns an estimate of the number of chunks produced by Iterator. +// +// It is perfectly accurate except when histograms are present in series. When histograms are +// present, EstimatedChunkCount will underestimate the number of chunks produced, as the +// estimation does not consider individual samples and so triggers for new chunks such as +// counter resets, changes to the bucket schema and changes to the zero threshold are not +// taken into account. +func (s *seriesToChunkEncoder) EstimatedChunkCount() int { + chunkCount := 0 + seriesIter := s.Series.Iterator(nil) + lastType := chunkenc.ValNone + samplesInChunk := 0 + + for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { + if chunkCount == 0 { + // We'll always have at least one chunk if there's at least one sample. + chunkCount++ + } else if lastType != typ { + // If the sample type changes, then we'll cut a new chunk. + chunkCount++ + samplesInChunk = 0 + } + + if samplesInChunk == seriesToChunkEncoderSplit { + chunkCount++ + samplesInChunk = 0 + } + + lastType = typ + samplesInChunk++ + } + + return chunkCount +} + func appendChunk(chks []chunks.Meta, mint, maxt int64, chk chunkenc.Chunk) []chunks.Meta { if chk != nil { chks = append(chks, chunks.Meta{ diff --git a/storage/series_test.go b/storage/series_test.go index 5c74fae096..5f8ba68c03 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -73,6 +73,36 @@ func TestListSeriesIterator(t *testing.T) { require.Equal(t, chunkenc.ValNone, it.Seek(2)) } +func TestNewListChunkSeriesFromSamples(t *testing.T) { + lbls := labels.FromStrings("__name__", "the_series") + series := NewListChunkSeriesFromSamples( + lbls, + samples{ + fSample{0, 0}, + fSample{1, 1}, + fSample{1, 1.5}, + fSample{2, 2}, + fSample{3, 3}, + }, + samples{ + fSample{4, 5}, + }, + ) + + require.Equal(t, lbls, series.Labels()) + + it := series.Iterator(nil) + chks := []chunks.Meta{} + + for it.Next() { + chks = append(chks, it.At()) + } + + require.NoError(t, it.Err()) + require.Len(t, chks, 2) + require.Equal(t, len(chks), series.EstimatedChunkCount(), "should have one chunk per sample") +} + // TestSeriesSetToChunkSet test the property of SeriesSet that says // returned series should be iterable even after Next is called. func TestChunkSeriesSetToSeriesSet(t *testing.T) { @@ -125,9 +155,84 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) { } } +func TestSeriesToChunks(t *testing.T) { + generateSamples := func(count int) []tsdbutil.Sample { + s := make([]tsdbutil.Sample, count) + + for i := 0; i < count; i++ { + s[i] = fSample{t: int64(i), f: float64(i) * 10.0} + } + + return s + } + + h := &histogram.Histogram{ + Count: 0, + ZeroThreshold: 0.001, + Schema: 0, + } + + testCases := map[string]struct { + samples []tsdbutil.Sample + expectedChunkCount int + }{ + "no samples": { + samples: []tsdbutil.Sample{}, + expectedChunkCount: 0, + }, + "single sample": { + samples: generateSamples(1), + expectedChunkCount: 1, + }, + "120 samples": { + samples: generateSamples(120), + expectedChunkCount: 1, + }, + "121 samples": { + samples: generateSamples(121), + expectedChunkCount: 2, + }, + "240 samples": { + samples: generateSamples(240), + expectedChunkCount: 2, + }, + "241 samples": { + samples: generateSamples(241), + expectedChunkCount: 3, + }, + "float samples and histograms": { + samples: []tsdbutil.Sample{ + fSample{t: 1, f: 10}, + fSample{t: 2, f: 20}, + hSample{t: 3, h: h}, + fSample{t: 4, f: 40}, + }, + expectedChunkCount: 3, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + lset := labels.FromStrings("__name__", "test_series") + series := NewListSeries(lset, testCase.samples) + encoder := NewSeriesToChunkEncoder(series) + require.Equal(t, lset, encoder.Labels()) + + chks, err := ExpandChunks(encoder.Iterator(nil)) + require.NoError(t, err) + require.Len(t, chks, testCase.expectedChunkCount) + require.Equal(t, testCase.expectedChunkCount, encoder.EstimatedChunkCount()) + + encodedSamples := expandChunks(chks) + require.Equal(t, testCase.samples, encodedSamples) + }) + } +} + type histogramTest struct { samples []tsdbutil.Sample expectedCounterResetHeaders []chunkenc.CounterResetHeader + expectedChunkCountEstimate int } func TestHistogramSeriesToChunks(t *testing.T) { @@ -288,6 +393,7 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 2, h: h1}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, + expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets. }, "histogram and stale sample encoded to two chunks": { samples: []tsdbutil.Sample{ @@ -295,6 +401,7 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 2, h: h1}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, + expectedChunkCountEstimate: 1, // Estimate doesn't consider stale markers. }, "histogram and reduction in bucket encoded to two chunks": { samples: []tsdbutil.Sample{ @@ -302,6 +409,7 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 2, h: h2down}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, + expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets. }, // Float histograms. "single float histogram to single chunk": { @@ -323,6 +431,7 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 2, fh: fh1}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, + expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets. }, "float histogram and stale sample encoded to two chunks": { samples: []tsdbutil.Sample{ @@ -330,6 +439,7 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 2, fh: fh1}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, + expectedChunkCountEstimate: 1, // Estimate doesn't consider stale markers. }, "float histogram and reduction in bucket encoded to two chunks": { samples: []tsdbutil.Sample{ @@ -337,6 +447,7 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 2, fh: fh2down}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, + expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets. }, // Mixed. "histogram and float histogram encoded to two chunks": { @@ -430,8 +541,14 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { require.NoError(t, err) require.Equal(t, len(test.expectedCounterResetHeaders), len(chks)) + if test.expectedChunkCountEstimate == 0 { + test.expectedChunkCountEstimate = len(chks) + } + + require.Equal(t, test.expectedChunkCountEstimate, encoder.EstimatedChunkCount()) + // Decode all encoded samples and assert they are equal to the original ones. - encodedSamples := expandHistogramSamples(chks) + encodedSamples := expandChunks(chks) require.Equal(t, len(test.samples), len(encodedSamples)) for i, s := range test.samples { @@ -470,9 +587,9 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { } } -func expandHistogramSamples(chunks []chunks.Meta) (result []tsdbutil.Sample) { +func expandChunks(chunks []chunks.Meta) (result []tsdbutil.Sample) { if len(chunks) == 0 { - return + return []tsdbutil.Sample{} } for _, chunk := range chunks { @@ -485,6 +602,9 @@ func expandHistogramSamples(chunks []chunks.Meta) (result []tsdbutil.Sample) { case chunkenc.ValFloatHistogram: t, fh := it.AtFloatHistogram() result = append(result, fhSample{t: t, fh: fh}) + case chunkenc.ValFloat: + t, f := it.At() + result = append(result, fSample{t: t, f: f}) default: panic("unexpected value type") } diff --git a/tsdb/querier.go b/tsdb/querier.go index 854687298c..cbee873baf 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -649,6 +649,10 @@ func (s *chunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return pi } +func (s *chunkSeriesEntry) EstimatedChunkCount() int { + return len(s.chks) +} + // populateWithDelSeriesIterator allows to iterate over samples for the single series. type populateWithDelSeriesIterator struct { populateWithDelGenericSeriesIterator