From 31235c351fc4f2e13a7811d7f32e187186037225 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 6 Jul 2023 15:25:11 +1000 Subject: [PATCH] Simplify trickier estimation cases by not estimating at all. --- storage/merge.go | 105 +++++++++-------------------------------- storage/merge_test.go | 17 ++----- storage/series.go | 32 +++---------- storage/series_test.go | 13 +---- 4 files changed, 34 insertions(+), 133 deletions(-) diff --git a/storage/merge.go b/storage/merge.go index 1728f87e6a..33023b1932 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -659,98 +659,39 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC return nil } + chunkIteratorFn := func(chunks.Iterator) chunks.Iterator { + iterators := make([]chunks.Iterator, 0, len(series)) + for _, s := range series { + iterators = append(iterators, s.Iterator(nil)) + } + return &compactChunkIterator{ + mergeFunc: mergeFunc, + iterators: iterators, + } + } + return &ChunkSeriesEntry{ - Lset: series[0].Labels(), - ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { - iterators := make([]chunks.Iterator, 0, len(series)) - for _, s := range series { - iterators = append(iterators, s.Iterator(nil)) - } - return &compactChunkIterator{ - mergeFunc: mergeFunc, - iterators: iterators, - } - }, + Lset: series[0].Labels(), + ChunkIteratorFn: chunkIteratorFn, ChunkCountFn: func() int { - return estimateCompactedChunkCount(series) + // This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir - + // it's just here to ensure things don't break if this assumption ever changes. + // Ingesters return uncompacted chunks to queriers, so this method is never called. + return countChunks(chunkIteratorFn) }, } } } -// estimateCompactedChunkCount computes an estimate of the resulting number of chunks -// after compacting series. -// -// The estimate is imperfect in a few ways, due to the fact it does not examine individual samples: -// - it does not check for duplicate samples in overlapping chunks, and so may overestimate -// the resulting number of chunks if duplicate samples are present, or if an entire chunk is -// duplicated -// - it does not check if overlapping chunks have samples that swap back and forth between -// different encodings over time, and so may underestimate the resulting number of chunks -// (we don't expect this to happen often though, as switching from float samples to histograms -// involves changing the instrumentation) -func estimateCompactedChunkCount(series []ChunkSeries) int { - h := make(chunkIteratorHeap, 0, len(series)) +func countChunks(chunkIteratorFn func(chunks.Iterator) chunks.Iterator) int { + chunkCount := 0 + it := chunkIteratorFn(nil) - for _, s := range series { - iter := s.Iterator(nil) - if iter.Next() { - heap.Push(&h, iter) - } + for it.Next() { + chunkCount++ } - totalChunkCount := 0 - - for len(h) > 0 { - iter := heap.Pop(&h).(chunks.Iterator) - prev := iter.At() - if iter.Next() { - heap.Push(&h, iter) - } - - chunkCountForThisTimePeriod := 0 - sampleCount := prev.Chunk.NumSamples() - maxTime := prev.MaxTime - - // Find all overlapping chunks and estimate the number of resulting chunks. - for len(h) > 0 && h[0].At().MinTime <= maxTime { - iter := heap.Pop(&h).(chunks.Iterator) - next := iter.At() - if iter.Next() { - heap.Push(&h, iter) - } - - if next.MaxTime > maxTime { - maxTime = next.MaxTime - } - - if prev.Chunk.Encoding() != next.Chunk.Encoding() { - // If we have more than seriesToChunkEncoderSplit samples, account for the additional chunks we'll create. - chunkCountForThisTimePeriod += sampleCount / seriesToChunkEncoderSplit - if sampleCount%seriesToChunkEncoderSplit > 0 { - chunkCountForThisTimePeriod++ - } - - sampleCount = 0 - } - - sampleCount += next.Chunk.NumSamples() - prev = next - } - - // If we have more than seriesToChunkEncoderSplit samples, account for the additional chunks we'll create. - chunkCountForThisTimePeriod += sampleCount / seriesToChunkEncoderSplit - if sampleCount%seriesToChunkEncoderSplit > 0 { - chunkCountForThisTimePeriod++ - } - if chunkCountForThisTimePeriod == 0 { - chunkCountForThisTimePeriod = 1 // We'll always create at least one chunk. - } - - totalChunkCount += chunkCountForThisTimePeriod - } - - return totalChunkCount + return chunkCount } // compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries. diff --git a/storage/merge_test.go b/storage/merge_test.go index e5eb296d1b..79772dd4ba 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -394,10 +394,9 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { } for _, tc := range []struct { - name string - input []ChunkSeries - expected ChunkSeries - expectedChunksEstimate int + name string + input []ChunkSeries + expected ChunkSeries }{ { name: "single empty series", @@ -492,7 +491,6 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 110), ), - expectedChunksEstimate: 2, // Estimation doesn't consider duplicate series when estimating the number of chunks. }, { name: "150 overlapping samples, split chunk", @@ -530,7 +528,6 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { []tsdbutil.Sample{fSample{12, 12}, fSample{14, 14}}, []tsdbutil.Sample{histogramSample(15)}, ), - expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings. }, { name: "float histogram chunks overlapping", @@ -557,7 +554,6 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { []tsdbutil.Sample{fSample{12, 12}, fSample{14, 14}}, []tsdbutil.Sample{floatHistogramSample(15)}, ), - expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings. }, { name: "float histogram chunks overlapping with histogram chunks", @@ -572,7 +568,6 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { []tsdbutil.Sample{histogramSample(12), histogramSample(14)}, []tsdbutil.Sample{floatHistogramSample(15)}, ), - expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings. }, } { t.Run(tc.name, func(t *testing.T) { @@ -584,11 +579,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) - if tc.expectedChunksEstimate == 0 { - tc.expectedChunksEstimate = len(actChks) - } - - require.Equalf(t, tc.expectedChunksEstimate, merged.EstimatedChunkCount(), "expected estimate of %v chunks, actual chunks are: %v", tc.expectedChunksEstimate, actChks) + require.Len(t, actChks, merged.EstimatedChunkCount()) }) } } diff --git a/storage/series.go b/storage/series.go index 7d0b8a5193..3bc91abec7 100644 --- a/storage/series.go +++ b/storage/series.go @@ -403,35 +403,15 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { } // EstimatedChunkCount returns an estimate of the number of chunks produced by Iterator. -// -// It is perfectly accurate except when histograms are present in series. When histograms are -// present, EstimatedChunkCount will underestimate the number of chunks produced, as the -// estimation does not consider individual samples and so triggers for new chunks such as -// counter resets, changes to the bucket schema and changes to the zero threshold are not -// taken into account. func (s *seriesToChunkEncoder) EstimatedChunkCount() int { + // This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir - + // it's just here to ensure things don't break if this assumption ever changes. + chunkCount := 0 - seriesIter := s.Series.Iterator(nil) - lastType := chunkenc.ValNone - samplesInChunk := 0 + it := s.Iterator(nil) - for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { - if chunkCount == 0 { - // We'll always have at least one chunk if there's at least one sample. - chunkCount++ - } else if lastType != typ { - // If the sample type changes, then we'll cut a new chunk. - chunkCount++ - samplesInChunk = 0 - } - - if samplesInChunk == seriesToChunkEncoderSplit { - chunkCount++ - samplesInChunk = 0 - } - - lastType = typ - samplesInChunk++ + for it.Next() { + chunkCount++ } return chunkCount diff --git a/storage/series_test.go b/storage/series_test.go index 5f8ba68c03..3f6625aeb9 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -232,7 +232,6 @@ func TestSeriesToChunks(t *testing.T) { type histogramTest struct { samples []tsdbutil.Sample expectedCounterResetHeaders []chunkenc.CounterResetHeader - expectedChunkCountEstimate int } func TestHistogramSeriesToChunks(t *testing.T) { @@ -393,7 +392,6 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 2, h: h1}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, - expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets. }, "histogram and stale sample encoded to two chunks": { samples: []tsdbutil.Sample{ @@ -401,7 +399,6 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 2, h: h1}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, - expectedChunkCountEstimate: 1, // Estimate doesn't consider stale markers. }, "histogram and reduction in bucket encoded to two chunks": { samples: []tsdbutil.Sample{ @@ -409,7 +406,6 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 2, h: h2down}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, - expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets. }, // Float histograms. "single float histogram to single chunk": { @@ -431,7 +427,6 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 2, fh: fh1}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, - expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets. }, "float histogram and stale sample encoded to two chunks": { samples: []tsdbutil.Sample{ @@ -439,7 +434,6 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 2, fh: fh1}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, - expectedChunkCountEstimate: 1, // Estimate doesn't consider stale markers. }, "float histogram and reduction in bucket encoded to two chunks": { samples: []tsdbutil.Sample{ @@ -447,7 +441,6 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 2, fh: fh2down}, }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, - expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets. }, // Mixed. "histogram and float histogram encoded to two chunks": { @@ -541,11 +534,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { require.NoError(t, err) require.Equal(t, len(test.expectedCounterResetHeaders), len(chks)) - if test.expectedChunkCountEstimate == 0 { - test.expectedChunkCountEstimate = len(chks) - } - - require.Equal(t, test.expectedChunkCountEstimate, encoder.EstimatedChunkCount()) + require.Len(t, chks, encoder.EstimatedChunkCount()) // Decode all encoded samples and assert they are equal to the original ones. encodedSamples := expandChunks(chks)