Permit returning an error from EstimatedChunkCount()

This commit is contained in:
Charles Korn 2023-07-06 15:41:05 +10:00
parent 31235c351f
commit a9445622ad
No known key found for this signature in database
6 changed files with 37 additions and 19 deletions

View file

@ -420,7 +420,7 @@ type ChunkSeries interface {
// //
// This estimate is used by Mimir's ingesters to report the number of chunks expected to be returned by a query, // This estimate is used by Mimir's ingesters to report the number of chunks expected to be returned by a query,
// which is used by queriers to enforce the 'max chunks per query' limit. // which is used by queriers to enforce the 'max chunks per query' limit.
EstimatedChunkCount() int EstimatedChunkCount() (int, error)
} }
// Labels represents an item that has labels e.g. time series. // Labels represents an item that has labels e.g. time series.

View file

@ -673,7 +673,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
return &ChunkSeriesEntry{ return &ChunkSeriesEntry{
Lset: series[0].Labels(), Lset: series[0].Labels(),
ChunkIteratorFn: chunkIteratorFn, ChunkIteratorFn: chunkIteratorFn,
ChunkCountFn: func() int { ChunkCountFn: func() (int, error) {
// This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir - // This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir -
// it's just here to ensure things don't break if this assumption ever changes. // it's just here to ensure things don't break if this assumption ever changes.
// Ingesters return uncompacted chunks to queriers, so this method is never called. // Ingesters return uncompacted chunks to queriers, so this method is never called.
@ -683,7 +683,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
} }
} }
func countChunks(chunkIteratorFn func(chunks.Iterator) chunks.Iterator) int { func countChunks(chunkIteratorFn func(chunks.Iterator) chunks.Iterator) (int, error) {
chunkCount := 0 chunkCount := 0
it := chunkIteratorFn(nil) it := chunkIteratorFn(nil)
@ -691,7 +691,7 @@ func countChunks(chunkIteratorFn func(chunks.Iterator) chunks.Iterator) int {
chunkCount++ chunkCount++
} }
return chunkCount return chunkCount, it.Err()
} }
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries. // compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
@ -833,14 +833,20 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
iterators: iterators, iterators: iterators,
} }
}, },
ChunkCountFn: func() int { ChunkCountFn: func() (int, error) {
chunkCount := 0 chunkCount := 0
for _, series := range series { for _, series := range series {
chunkCount += series.EstimatedChunkCount() c, err := series.EstimatedChunkCount()
if err != nil {
return 0, err
} }
return chunkCount chunkCount += c
}
return chunkCount, nil
}, },
} }
} }

View file

@ -579,7 +579,9 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
require.Equal(t, expErr, actErr) require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks) require.Equal(t, expChks, actChks)
require.Len(t, actChks, merged.EstimatedChunkCount()) count, err := merged.EstimatedChunkCount()
require.NoError(t, err)
require.Len(t, actChks, count)
}) })
} }
} }
@ -714,7 +716,10 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) {
require.Equal(t, expErr, actErr) require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks) require.Equal(t, expChks, actChks)
require.Equal(t, len(expChks), merged.EstimatedChunkCount())
count, err := merged.EstimatedChunkCount()
require.NoError(t, err)
require.Equal(t, len(expChks), count)
}) })
} }
} }

View file

@ -35,13 +35,13 @@ func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return
type ChunkSeriesEntry struct { type ChunkSeriesEntry struct {
Lset labels.Labels Lset labels.Labels
ChunkCountFn func() int ChunkCountFn func() (int, error)
ChunkIteratorFn func(chunks.Iterator) chunks.Iterator ChunkIteratorFn func(chunks.Iterator) chunks.Iterator
} }
func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset } func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) } func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) }
func (s *ChunkSeriesEntry) EstimatedChunkCount() int { return s.ChunkCountFn() } func (s *ChunkSeriesEntry) EstimatedChunkCount() (int, error) { return s.ChunkCountFn() }
// NewListSeries returns series entry with iterator that allows to iterate over provided samples. // NewListSeries returns series entry with iterator that allows to iterate over provided samples.
func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
@ -80,7 +80,7 @@ func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sam
} }
return NewListChunkSeriesIterator(chks...) return NewListChunkSeriesIterator(chks...)
}, },
ChunkCountFn: func() int { return len(samples) }, // We create one chunk per slice of samples. ChunkCountFn: func() (int, error) { return len(samples), nil }, // We create one chunk per slice of samples.
} }
} }
@ -403,7 +403,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
} }
// EstimatedChunkCount returns an estimate of the number of chunks produced by Iterator. // EstimatedChunkCount returns an estimate of the number of chunks produced by Iterator.
func (s *seriesToChunkEncoder) EstimatedChunkCount() int { func (s *seriesToChunkEncoder) EstimatedChunkCount() (int, error) {
// This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir - // This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir -
// it's just here to ensure things don't break if this assumption ever changes. // it's just here to ensure things don't break if this assumption ever changes.
@ -414,7 +414,7 @@ func (s *seriesToChunkEncoder) EstimatedChunkCount() int {
chunkCount++ chunkCount++
} }
return chunkCount return chunkCount, it.Err()
} }
func appendChunk(chks []chunks.Meta, mint, maxt int64, chk chunkenc.Chunk) []chunks.Meta { func appendChunk(chks []chunks.Meta, mint, maxt int64, chk chunkenc.Chunk) []chunks.Meta {

View file

@ -100,7 +100,10 @@ func TestNewListChunkSeriesFromSamples(t *testing.T) {
require.NoError(t, it.Err()) require.NoError(t, it.Err())
require.Len(t, chks, 2) require.Len(t, chks, 2)
require.Equal(t, len(chks), series.EstimatedChunkCount(), "should have one chunk per sample")
count, err := series.EstimatedChunkCount()
require.NoError(t, err)
require.Equal(t, len(chks), count, "should have one chunk per group of samples")
} }
// TestSeriesSetToChunkSet test the property of SeriesSet that says // TestSeriesSetToChunkSet test the property of SeriesSet that says
@ -221,7 +224,9 @@ func TestSeriesToChunks(t *testing.T) {
chks, err := ExpandChunks(encoder.Iterator(nil)) chks, err := ExpandChunks(encoder.Iterator(nil))
require.NoError(t, err) require.NoError(t, err)
require.Len(t, chks, testCase.expectedChunkCount) require.Len(t, chks, testCase.expectedChunkCount)
require.Equal(t, testCase.expectedChunkCount, encoder.EstimatedChunkCount()) count, err := encoder.EstimatedChunkCount()
require.NoError(t, err)
require.Equal(t, testCase.expectedChunkCount, count)
encodedSamples := expandChunks(chks) encodedSamples := expandChunks(chks)
require.Equal(t, testCase.samples, encodedSamples) require.Equal(t, testCase.samples, encodedSamples)
@ -534,7 +539,9 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(test.expectedCounterResetHeaders), len(chks)) require.Equal(t, len(test.expectedCounterResetHeaders), len(chks))
require.Len(t, chks, encoder.EstimatedChunkCount()) count, err := encoder.EstimatedChunkCount()
require.NoError(t, err)
require.Len(t, chks, count)
// Decode all encoded samples and assert they are equal to the original ones. // Decode all encoded samples and assert they are equal to the original ones.
encodedSamples := expandChunks(chks) encodedSamples := expandChunks(chks)

View file

@ -649,8 +649,8 @@ func (s *chunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator {
return pi return pi
} }
func (s *chunkSeriesEntry) EstimatedChunkCount() int { func (s *chunkSeriesEntry) EstimatedChunkCount() (int, error) {
return len(s.chks) return len(s.chks), nil
} }
// populateWithDelSeriesIterator allows to iterate over samples for the single series. // populateWithDelSeriesIterator allows to iterate over samples for the single series.