Simplify trickier estimation cases by not estimating at all.

This commit is contained in:
Charles Korn 2023-07-06 15:25:11 +10:00
parent f71a97b460
commit 31235c351f
No known key found for this signature in database
4 changed files with 34 additions and 133 deletions

View file

@ -659,9 +659,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
return nil
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
chunkIteratorFn := func(chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator(nil))
@ -670,87 +668,30 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
mergeFunc: mergeFunc,
iterators: iterators,
}
},
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: chunkIteratorFn,
ChunkCountFn: func() int {
return estimateCompactedChunkCount(series)
// This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir -
// it's just here to ensure things don't break if this assumption ever changes.
// Ingesters return uncompacted chunks to queriers, so this method is never called.
return countChunks(chunkIteratorFn)
},
}
}
}
// estimateCompactedChunkCount computes an estimate of the resulting number of chunks
// after compacting series.
//
// The estimate is imperfect in a few ways, due to the fact it does not examine individual samples:
// - it does not check for duplicate samples in overlapping chunks, and so may overestimate
// the resulting number of chunks if duplicate samples are present, or if an entire chunk is
// duplicated
// - it does not check if overlapping chunks have samples that swap back and forth between
// different encodings over time, and so may underestimate the resulting number of chunks
// (we don't expect this to happen often though, as switching from float samples to histograms
// involves changing the instrumentation)
func estimateCompactedChunkCount(series []ChunkSeries) int {
h := make(chunkIteratorHeap, 0, len(series))
func countChunks(chunkIteratorFn func(chunks.Iterator) chunks.Iterator) int {
chunkCount := 0
it := chunkIteratorFn(nil)
for _, s := range series {
iter := s.Iterator(nil)
if iter.Next() {
heap.Push(&h, iter)
}
for it.Next() {
chunkCount++
}
totalChunkCount := 0
for len(h) > 0 {
iter := heap.Pop(&h).(chunks.Iterator)
prev := iter.At()
if iter.Next() {
heap.Push(&h, iter)
}
chunkCountForThisTimePeriod := 0
sampleCount := prev.Chunk.NumSamples()
maxTime := prev.MaxTime
// Find all overlapping chunks and estimate the number of resulting chunks.
for len(h) > 0 && h[0].At().MinTime <= maxTime {
iter := heap.Pop(&h).(chunks.Iterator)
next := iter.At()
if iter.Next() {
heap.Push(&h, iter)
}
if next.MaxTime > maxTime {
maxTime = next.MaxTime
}
if prev.Chunk.Encoding() != next.Chunk.Encoding() {
// If we have more than seriesToChunkEncoderSplit samples, account for the additional chunks we'll create.
chunkCountForThisTimePeriod += sampleCount / seriesToChunkEncoderSplit
if sampleCount%seriesToChunkEncoderSplit > 0 {
chunkCountForThisTimePeriod++
}
sampleCount = 0
}
sampleCount += next.Chunk.NumSamples()
prev = next
}
// If we have more than seriesToChunkEncoderSplit samples, account for the additional chunks we'll create.
chunkCountForThisTimePeriod += sampleCount / seriesToChunkEncoderSplit
if sampleCount%seriesToChunkEncoderSplit > 0 {
chunkCountForThisTimePeriod++
}
if chunkCountForThisTimePeriod == 0 {
chunkCountForThisTimePeriod = 1 // We'll always create at least one chunk.
}
totalChunkCount += chunkCountForThisTimePeriod
}
return totalChunkCount
return chunkCount
}
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.

View file

@ -397,7 +397,6 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
name string
input []ChunkSeries
expected ChunkSeries
expectedChunksEstimate int
}{
{
name: "single empty series",
@ -492,7 +491,6 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
tsdbutil.GenerateSamples(0, 110),
),
expectedChunksEstimate: 2, // Estimation doesn't consider duplicate series when estimating the number of chunks.
},
{
name: "150 overlapping samples, split chunk",
@ -530,7 +528,6 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
[]tsdbutil.Sample{fSample{12, 12}, fSample{14, 14}},
[]tsdbutil.Sample{histogramSample(15)},
),
expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings.
},
{
name: "float histogram chunks overlapping",
@ -557,7 +554,6 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
[]tsdbutil.Sample{fSample{12, 12}, fSample{14, 14}},
[]tsdbutil.Sample{floatHistogramSample(15)},
),
expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings.
},
{
name: "float histogram chunks overlapping with histogram chunks",
@ -572,7 +568,6 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
[]tsdbutil.Sample{histogramSample(12), histogramSample(14)},
[]tsdbutil.Sample{floatHistogramSample(15)},
),
expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings.
},
} {
t.Run(tc.name, func(t *testing.T) {
@ -584,11 +579,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
if tc.expectedChunksEstimate == 0 {
tc.expectedChunksEstimate = len(actChks)
}
require.Equalf(t, tc.expectedChunksEstimate, merged.EstimatedChunkCount(), "expected estimate of %v chunks, actual chunks are: %v", tc.expectedChunksEstimate, actChks)
require.Len(t, actChks, merged.EstimatedChunkCount())
})
}
}

View file

@ -403,35 +403,15 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
}
// EstimatedChunkCount returns an estimate of the number of chunks produced by Iterator.
//
// It is perfectly accurate except when histograms are present in series. When histograms are
// present, EstimatedChunkCount will underestimate the number of chunks produced, as the
// estimation does not consider individual samples and so triggers for new chunks such as
// counter resets, changes to the bucket schema and changes to the zero threshold are not
// taken into account.
func (s *seriesToChunkEncoder) EstimatedChunkCount() int {
// This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir -
// it's just here to ensure things don't break if this assumption ever changes.
chunkCount := 0
seriesIter := s.Series.Iterator(nil)
lastType := chunkenc.ValNone
samplesInChunk := 0
it := s.Iterator(nil)
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
if chunkCount == 0 {
// We'll always have at least one chunk if there's at least one sample.
for it.Next() {
chunkCount++
} else if lastType != typ {
// If the sample type changes, then we'll cut a new chunk.
chunkCount++
samplesInChunk = 0
}
if samplesInChunk == seriesToChunkEncoderSplit {
chunkCount++
samplesInChunk = 0
}
lastType = typ
samplesInChunk++
}
return chunkCount

View file

@ -232,7 +232,6 @@ func TestSeriesToChunks(t *testing.T) {
type histogramTest struct {
samples []tsdbutil.Sample
expectedCounterResetHeaders []chunkenc.CounterResetHeader
expectedChunkCountEstimate int
}
func TestHistogramSeriesToChunks(t *testing.T) {
@ -393,7 +392,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 2, h: h1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets.
},
"histogram and stale sample encoded to two chunks": {
samples: []tsdbutil.Sample{
@ -401,7 +399,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 2, h: h1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider stale markers.
},
"histogram and reduction in bucket encoded to two chunks": {
samples: []tsdbutil.Sample{
@ -409,7 +406,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 2, h: h2down},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets.
},
// Float histograms.
"single float histogram to single chunk": {
@ -431,7 +427,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 2, fh: fh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets.
},
"float histogram and stale sample encoded to two chunks": {
samples: []tsdbutil.Sample{
@ -439,7 +434,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 2, fh: fh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider stale markers.
},
"float histogram and reduction in bucket encoded to two chunks": {
samples: []tsdbutil.Sample{
@ -447,7 +441,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 2, fh: fh2down},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets.
},
// Mixed.
"histogram and float histogram encoded to two chunks": {
@ -541,11 +534,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
require.NoError(t, err)
require.Equal(t, len(test.expectedCounterResetHeaders), len(chks))
if test.expectedChunkCountEstimate == 0 {
test.expectedChunkCountEstimate = len(chks)
}
require.Equal(t, test.expectedChunkCountEstimate, encoder.EstimatedChunkCount())
require.Len(t, chks, encoder.EstimatedChunkCount())
// Decode all encoded samples and assert they are equal to the original ones.
encodedSamples := expandChunks(chks)