Add EstimatedChunkCount() method to ChunkSeries

Signed-off-by: Charles Korn <charles.korn@grafana.com>
This commit is contained in:
Charles Korn 2023-07-05 15:29:00 +10:00
parent 5c270e23ea
commit 8b84856ba5
No known key found for this signature in database
6 changed files with 273 additions and 6 deletions

View file

@ -415,6 +415,12 @@ type ChunkSeriesSet interface {
type ChunkSeries interface {
Labels
ChunkIterable
// EstimatedChunkCount returns an estimate of the number of chunks available from this ChunkSeries.
//
// This estimate is used by Mimir's ingesters to report the number of chunks expected to be returned by a query,
// which is used by queriers to enforce the 'max chunks per query' limit.
EstimatedChunkCount() int
}
// Labels represents an item that has labels e.g. time series.

View file

@ -658,6 +658,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
if len(series) == 0 {
return nil
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
@ -670,10 +671,86 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
iterators: iterators,
}
},
ChunkCountFn: func() int {
return estimateCompactedChunkCount(series)
},
}
}
}
// estimateCompactedChunkCount computes an estimate of the resulting number of chunks
// after compacting series.
//
// The estimate is imperfect in a few ways, due to the fact it does not examine individual samples:
// - it does not check for duplicate samples in overlapping chunks, and so may overestimate
// the resulting number of chunks if duplicate samples are present, or if an entire chunk is
// duplicated
// - it does not check if overlapping chunks have samples that swap back and forth between
// different encodings over time, and so may underestimate the resulting number of chunks
func estimateCompactedChunkCount(series []ChunkSeries) int {
h := make(chunkIteratorHeap, 0, len(series))
for _, s := range series {
iter := s.Iterator(nil)
if iter.Next() {
h.Push(iter)
}
}
totalChunkCount := 0
for len(h) > 0 {
iter := heap.Pop(&h).(chunks.Iterator)
prev := iter.At()
if iter.Next() {
heap.Push(&h, iter)
}
chunkCountForThisTimePeriod := 0
sampleCount := prev.Chunk.NumSamples()
maxTime := prev.MaxTime
// Find all overlapping chunks and estimate the number of resulting chunks.
for len(h) > 0 && h[0].At().MinTime <= maxTime {
iter := heap.Pop(&h).(chunks.Iterator)
next := iter.At()
if iter.Next() {
heap.Push(&h, iter)
}
if next.MaxTime > maxTime {
maxTime = next.MaxTime
}
if prev.Chunk.Encoding() != next.Chunk.Encoding() {
// If we have more than seriesToChunkEncoderSplit samples, account for the additional chunks we'll create.
chunkCountForThisTimePeriod += sampleCount / seriesToChunkEncoderSplit
if sampleCount%seriesToChunkEncoderSplit > 0 {
chunkCountForThisTimePeriod++
}
sampleCount = 0
}
sampleCount += next.Chunk.NumSamples()
prev = next
}
// If we have more than seriesToChunkEncoderSplit samples, account for the additional chunks we'll create.
chunkCountForThisTimePeriod += sampleCount / seriesToChunkEncoderSplit
if sampleCount%seriesToChunkEncoderSplit > 0 {
chunkCountForThisTimePeriod++
}
if chunkCountForThisTimePeriod == 0 {
chunkCountForThisTimePeriod = 1 // We'll always create at least one chunk.
}
totalChunkCount += chunkCountForThisTimePeriod
}
return totalChunkCount
}
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
@ -801,6 +878,7 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
if len(series) == 0 {
return nil
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
@ -812,6 +890,15 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
iterators: iterators,
}
},
ChunkCountFn: func() int {
chunkCount := 0
for _, series := range series {
chunkCount += series.EstimatedChunkCount()
}
return chunkCount
},
}
}
}

View file

@ -394,9 +394,10 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
}
for _, tc := range []struct {
name string
input []ChunkSeries
expected ChunkSeries
name string
input []ChunkSeries
expected ChunkSeries
expectedChunksEstimate int
}{
{
name: "single empty series",
@ -483,6 +484,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
tsdbutil.GenerateSamples(0, 110),
),
expectedChunksEstimate: 2, // Estimation doesn't consider duplicate series when estimating the number of chunks.
},
{
name: "150 overlapping samples, split chunk",
@ -520,6 +522,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
[]tsdbutil.Sample{fSample{12, 12}, fSample{14, 14}},
[]tsdbutil.Sample{histogramSample(15)},
),
expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings.
},
{
name: "float histogram chunks overlapping",
@ -546,6 +549,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
[]tsdbutil.Sample{fSample{12, 12}, fSample{14, 14}},
[]tsdbutil.Sample{floatHistogramSample(15)},
),
expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings.
},
{
name: "float histogram chunks overlapping with histogram chunks",
@ -560,6 +564,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
[]tsdbutil.Sample{histogramSample(12), histogramSample(14)},
[]tsdbutil.Sample{floatHistogramSample(15)},
),
expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings.
},
} {
t.Run(tc.name, func(t *testing.T) {
@ -570,6 +575,12 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
if tc.expectedChunksEstimate == 0 {
tc.expectedChunksEstimate = len(actChks)
}
require.Equalf(t, tc.expectedChunksEstimate, merged.EstimatedChunkCount(), "expected estimate of %v chunks, actual chunks are: %v", tc.expectedChunksEstimate, actChks)
})
}
}
@ -704,6 +715,7 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) {
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
require.Equal(t, len(expChks), merged.EstimatedChunkCount())
})
}
}

View file

@ -35,11 +35,13 @@ func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return
type ChunkSeriesEntry struct {
Lset labels.Labels
ChunkCountFn func() int
ChunkIteratorFn func(chunks.Iterator) chunks.Iterator
}
func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) }
func (s *ChunkSeriesEntry) EstimatedChunkCount() int { return s.ChunkCountFn() }
// NewListSeries returns series entry with iterator that allows to iterate over provided samples.
func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
@ -78,6 +80,7 @@ func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sam
}
return NewListChunkSeriesIterator(chks...)
},
ChunkCountFn: func() int { return len(samples) }, // We create one chunk per slice of samples.
}
}
@ -399,6 +402,41 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
return NewListChunkSeriesIterator(chks...)
}
// EstimatedChunkCount returns an estimate of the number of chunks produced by Iterator.
//
// It is perfectly accurate except when histograms are present in series. When histograms are
// present, EstimatedChunkCount will underestimate the number of chunks produced, as the
// estimation does not consider individual samples and so triggers for new chunks such as
// counter resets, changes to the bucket schema and changes to the zero threshold are not
// taken into account.
func (s *seriesToChunkEncoder) EstimatedChunkCount() int {
chunkCount := 0
seriesIter := s.Series.Iterator(nil)
lastType := chunkenc.ValNone
samplesInChunk := 0
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
if chunkCount == 0 {
// We'll always have at least one chunk if there's at least one sample.
chunkCount++
} else if lastType != typ {
// If the sample type changes, then we'll cut a new chunk.
chunkCount++
samplesInChunk = 0
}
if samplesInChunk == seriesToChunkEncoderSplit {
chunkCount++
samplesInChunk = 0
}
lastType = typ
samplesInChunk++
}
return chunkCount
}
func appendChunk(chks []chunks.Meta, mint, maxt int64, chk chunkenc.Chunk) []chunks.Meta {
if chk != nil {
chks = append(chks, chunks.Meta{

View file

@ -73,6 +73,36 @@ func TestListSeriesIterator(t *testing.T) {
require.Equal(t, chunkenc.ValNone, it.Seek(2))
}
func TestNewListChunkSeriesFromSamples(t *testing.T) {
lbls := labels.FromStrings("__name__", "the_series")
series := NewListChunkSeriesFromSamples(
lbls,
samples{
fSample{0, 0},
fSample{1, 1},
fSample{1, 1.5},
fSample{2, 2},
fSample{3, 3},
},
samples{
fSample{4, 5},
},
)
require.Equal(t, lbls, series.Labels())
it := series.Iterator(nil)
chks := []chunks.Meta{}
for it.Next() {
chks = append(chks, it.At())
}
require.NoError(t, it.Err())
require.Len(t, chks, 2)
require.Equal(t, len(chks), series.EstimatedChunkCount(), "should have one chunk per sample")
}
// TestSeriesSetToChunkSet test the property of SeriesSet that says
// returned series should be iterable even after Next is called.
func TestChunkSeriesSetToSeriesSet(t *testing.T) {
@ -125,9 +155,84 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) {
}
}
func TestSeriesToChunks(t *testing.T) {
generateSamples := func(count int) []tsdbutil.Sample {
s := make([]tsdbutil.Sample, count)
for i := 0; i < count; i++ {
s[i] = fSample{t: int64(i), f: float64(i) * 10.0}
}
return s
}
h := &histogram.Histogram{
Count: 0,
ZeroThreshold: 0.001,
Schema: 0,
}
testCases := map[string]struct {
samples []tsdbutil.Sample
expectedChunkCount int
}{
"no samples": {
samples: []tsdbutil.Sample{},
expectedChunkCount: 0,
},
"single sample": {
samples: generateSamples(1),
expectedChunkCount: 1,
},
"120 samples": {
samples: generateSamples(120),
expectedChunkCount: 1,
},
"121 samples": {
samples: generateSamples(121),
expectedChunkCount: 2,
},
"240 samples": {
samples: generateSamples(240),
expectedChunkCount: 2,
},
"241 samples": {
samples: generateSamples(241),
expectedChunkCount: 3,
},
"float samples and histograms": {
samples: []tsdbutil.Sample{
fSample{t: 1, f: 10},
fSample{t: 2, f: 20},
hSample{t: 3, h: h},
fSample{t: 4, f: 40},
},
expectedChunkCount: 3,
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
lset := labels.FromStrings("__name__", "test_series")
series := NewListSeries(lset, testCase.samples)
encoder := NewSeriesToChunkEncoder(series)
require.Equal(t, lset, encoder.Labels())
chks, err := ExpandChunks(encoder.Iterator(nil))
require.NoError(t, err)
require.Len(t, chks, testCase.expectedChunkCount)
require.Equal(t, testCase.expectedChunkCount, encoder.EstimatedChunkCount())
encodedSamples := expandChunks(chks)
require.Equal(t, testCase.samples, encodedSamples)
})
}
}
type histogramTest struct {
samples []tsdbutil.Sample
expectedCounterResetHeaders []chunkenc.CounterResetHeader
expectedChunkCountEstimate int
}
func TestHistogramSeriesToChunks(t *testing.T) {
@ -288,6 +393,7 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 2, h: h1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets.
},
"histogram and stale sample encoded to two chunks": {
samples: []tsdbutil.Sample{
@ -295,6 +401,7 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 2, h: h1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider stale markers.
},
"histogram and reduction in bucket encoded to two chunks": {
samples: []tsdbutil.Sample{
@ -302,6 +409,7 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 2, h: h2down},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets.
},
// Float histograms.
"single float histogram to single chunk": {
@ -323,6 +431,7 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 2, fh: fh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets.
},
"float histogram and stale sample encoded to two chunks": {
samples: []tsdbutil.Sample{
@ -330,6 +439,7 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 2, fh: fh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider stale markers.
},
"float histogram and reduction in bucket encoded to two chunks": {
samples: []tsdbutil.Sample{
@ -337,6 +447,7 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 2, fh: fh2down},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedChunkCountEstimate: 1, // Estimate doesn't consider counter resets.
},
// Mixed.
"histogram and float histogram encoded to two chunks": {
@ -430,8 +541,14 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
require.NoError(t, err)
require.Equal(t, len(test.expectedCounterResetHeaders), len(chks))
if test.expectedChunkCountEstimate == 0 {
test.expectedChunkCountEstimate = len(chks)
}
require.Equal(t, test.expectedChunkCountEstimate, encoder.EstimatedChunkCount())
// Decode all encoded samples and assert they are equal to the original ones.
encodedSamples := expandHistogramSamples(chks)
encodedSamples := expandChunks(chks)
require.Equal(t, len(test.samples), len(encodedSamples))
for i, s := range test.samples {
@ -470,9 +587,9 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
}
}
func expandHistogramSamples(chunks []chunks.Meta) (result []tsdbutil.Sample) {
func expandChunks(chunks []chunks.Meta) (result []tsdbutil.Sample) {
if len(chunks) == 0 {
return
return []tsdbutil.Sample{}
}
for _, chunk := range chunks {
@ -485,6 +602,9 @@ func expandHistogramSamples(chunks []chunks.Meta) (result []tsdbutil.Sample) {
case chunkenc.ValFloatHistogram:
t, fh := it.AtFloatHistogram()
result = append(result, fhSample{t: t, fh: fh})
case chunkenc.ValFloat:
t, f := it.At()
result = append(result, fSample{t: t, f: f})
default:
panic("unexpected value type")
}

View file

@ -649,6 +649,10 @@ func (s *chunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator {
return pi
}
func (s *chunkSeriesEntry) EstimatedChunkCount() int {
return len(s.chks)
}
// populateWithDelSeriesIterator allows to iterate over samples for the single series.
type populateWithDelSeriesIterator struct {
populateWithDelGenericSeriesIterator