diff --git a/storage/interface.go b/storage/interface.go index 2a440cc93c..74ddc5acad 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -415,6 +415,12 @@ type ChunkSeriesSet interface { type ChunkSeries interface { Labels ChunkIterable + + // ChunkCount returns the number of chunks available from this ChunkSeries. + // + // This value is used by Mimir's ingesters to report the number of chunks expected to be returned by a query, + // which is used by queriers to enforce the 'max chunks per query' limit. + ChunkCount() (int, error) } // Labels represents an item that has labels e.g. time series. diff --git a/storage/merge.go b/storage/merge.go index c0665d720b..a196b0bc0d 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -658,22 +658,42 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC if len(series) == 0 { return nil } + + chunkIteratorFn := func(chunks.Iterator) chunks.Iterator { + iterators := make([]chunks.Iterator, 0, len(series)) + for _, s := range series { + iterators = append(iterators, s.Iterator(nil)) + } + return &compactChunkIterator{ + mergeFunc: mergeFunc, + iterators: iterators, + } + } + return &ChunkSeriesEntry{ - Lset: series[0].Labels(), - ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { - iterators := make([]chunks.Iterator, 0, len(series)) - for _, s := range series { - iterators = append(iterators, s.Iterator(nil)) - } - return &compactChunkIterator{ - mergeFunc: mergeFunc, - iterators: iterators, - } + Lset: series[0].Labels(), + ChunkIteratorFn: chunkIteratorFn, + ChunkCountFn: func() (int, error) { + // This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir - + // it's just here to ensure things don't break if this assumption ever changes. + // Ingesters return uncompacted chunks to queriers, so this method is never called. + return countChunks(chunkIteratorFn) }, } } } +func countChunks(chunkIteratorFn func(chunks.Iterator) chunks.Iterator) (int, error) { + chunkCount := 0 + it := chunkIteratorFn(nil) + + for it.Next() { + chunkCount++ + } + + return chunkCount, it.Err() +} + // compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries. // If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk. // TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 @@ -801,6 +821,7 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc { if len(series) == 0 { return nil } + return &ChunkSeriesEntry{ Lset: series[0].Labels(), ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { @@ -812,6 +833,20 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc { iterators: iterators, } }, + ChunkCountFn: func() (int, error) { + chunkCount := 0 + + for _, series := range series { + c, err := series.ChunkCount() + if err != nil { + return 0, err + } + + chunkCount += c + } + + return chunkCount, nil + }, } } } diff --git a/storage/merge_test.go b/storage/merge_test.go index b0544c2d81..82627d9871 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -428,6 +428,14 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { }, expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{fSample{1, 1}, fSample{2, 2}}, []tsdbutil.Sample{fSample{3, 3}, fSample{5, 5}}, []tsdbutil.Sample{fSample{7, 7}, fSample{9, 9}}, []tsdbutil.Sample{fSample{10, 10}}), }, + { + name: "two non overlapping in reverse order", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{fSample{7, 7}, fSample{9, 9}}, []tsdbutil.Sample{fSample{10, 10}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{fSample{1, 1}, fSample{2, 2}}, []tsdbutil.Sample{fSample{3, 3}, fSample{5, 5}}), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{fSample{1, 1}, fSample{2, 2}}, []tsdbutil.Sample{fSample{3, 3}, fSample{5, 5}}, []tsdbutil.Sample{fSample{7, 7}, fSample{9, 9}}, []tsdbutil.Sample{fSample{10, 10}}), + }, { name: "two overlapping", input: []ChunkSeries{ @@ -570,6 +578,10 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) + + count, err := merged.ChunkCount() + require.NoError(t, err) + require.Len(t, actChks, count) }) } } @@ -704,6 +716,10 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) { require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) + + count, err := merged.ChunkCount() + require.NoError(t, err) + require.Equal(t, len(expChks), count) }) } } diff --git a/storage/series.go b/storage/series.go index b73f1e35ce..1d1aa820e9 100644 --- a/storage/series.go +++ b/storage/series.go @@ -35,11 +35,13 @@ func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return type ChunkSeriesEntry struct { Lset labels.Labels + ChunkCountFn func() (int, error) ChunkIteratorFn func(chunks.Iterator) chunks.Iterator } func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset } func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) } +func (s *ChunkSeriesEntry) ChunkCount() (int, error) { return s.ChunkCountFn() } // NewListSeries returns series entry with iterator that allows to iterate over provided samples. func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { @@ -78,6 +80,7 @@ func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sam } return NewListChunkSeriesIterator(chks...) }, + ChunkCountFn: func() (int, error) { return len(samples), nil }, // We create one chunk per slice of samples. } } @@ -399,6 +402,20 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { return NewListChunkSeriesIterator(chks...) } +func (s *seriesToChunkEncoder) ChunkCount() (int, error) { + // This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir - + // it's just here to ensure things don't break if this assumption ever changes. + + chunkCount := 0 + it := s.Iterator(nil) + + for it.Next() { + chunkCount++ + } + + return chunkCount, it.Err() +} + func appendChunk(chks []chunks.Meta, mint, maxt int64, chk chunkenc.Chunk) []chunks.Meta { if chk != nil { chks = append(chks, chunks.Meta{ diff --git a/storage/series_test.go b/storage/series_test.go index 5c74fae096..81f0080773 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -73,6 +73,39 @@ func TestListSeriesIterator(t *testing.T) { require.Equal(t, chunkenc.ValNone, it.Seek(2)) } +func TestNewListChunkSeriesFromSamples(t *testing.T) { + lbls := labels.FromStrings("__name__", "the_series") + series := NewListChunkSeriesFromSamples( + lbls, + samples{ + fSample{0, 0}, + fSample{1, 1}, + fSample{1, 1.5}, + fSample{2, 2}, + fSample{3, 3}, + }, + samples{ + fSample{4, 5}, + }, + ) + + require.Equal(t, lbls, series.Labels()) + + it := series.Iterator(nil) + chks := []chunks.Meta{} + + for it.Next() { + chks = append(chks, it.At()) + } + + require.NoError(t, it.Err()) + require.Len(t, chks, 2) + + count, err := series.ChunkCount() + require.NoError(t, err) + require.Equal(t, len(chks), count, "should have one chunk per group of samples") +} + // TestSeriesSetToChunkSet test the property of SeriesSet that says // returned series should be iterable even after Next is called. func TestChunkSeriesSetToSeriesSet(t *testing.T) { @@ -125,6 +158,82 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) { } } +func TestSeriesToChunks(t *testing.T) { + generateSamples := func(count int) []tsdbutil.Sample { + s := make([]tsdbutil.Sample, count) + + for i := 0; i < count; i++ { + s[i] = fSample{t: int64(i), f: float64(i) * 10.0} + } + + return s + } + + h := &histogram.Histogram{ + Count: 0, + ZeroThreshold: 0.001, + Schema: 0, + } + + testCases := map[string]struct { + samples []tsdbutil.Sample + expectedChunkCount int + }{ + "no samples": { + samples: []tsdbutil.Sample{}, + expectedChunkCount: 0, + }, + "single sample": { + samples: generateSamples(1), + expectedChunkCount: 1, + }, + "120 samples": { + samples: generateSamples(120), + expectedChunkCount: 1, + }, + "121 samples": { + samples: generateSamples(121), + expectedChunkCount: 2, + }, + "240 samples": { + samples: generateSamples(240), + expectedChunkCount: 2, + }, + "241 samples": { + samples: generateSamples(241), + expectedChunkCount: 3, + }, + "float samples and histograms": { + samples: []tsdbutil.Sample{ + fSample{t: 1, f: 10}, + fSample{t: 2, f: 20}, + hSample{t: 3, h: h}, + fSample{t: 4, f: 40}, + }, + expectedChunkCount: 3, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + lset := labels.FromStrings("__name__", "test_series") + series := NewListSeries(lset, testCase.samples) + encoder := NewSeriesToChunkEncoder(series) + require.Equal(t, lset, encoder.Labels()) + + chks, err := ExpandChunks(encoder.Iterator(nil)) + require.NoError(t, err) + require.Len(t, chks, testCase.expectedChunkCount) + count, err := encoder.ChunkCount() + require.NoError(t, err) + require.Equal(t, testCase.expectedChunkCount, count) + + encodedSamples := expandChunks(chks) + require.Equal(t, testCase.samples, encodedSamples) + }) + } +} + type histogramTest struct { samples []tsdbutil.Sample expectedCounterResetHeaders []chunkenc.CounterResetHeader @@ -430,8 +539,12 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { require.NoError(t, err) require.Equal(t, len(test.expectedCounterResetHeaders), len(chks)) + count, err := encoder.ChunkCount() + require.NoError(t, err) + require.Len(t, chks, count) + // Decode all encoded samples and assert they are equal to the original ones. - encodedSamples := expandHistogramSamples(chks) + encodedSamples := expandChunks(chks) require.Equal(t, len(test.samples), len(encodedSamples)) for i, s := range test.samples { @@ -470,9 +583,9 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { } } -func expandHistogramSamples(chunks []chunks.Meta) (result []tsdbutil.Sample) { +func expandChunks(chunks []chunks.Meta) (result []tsdbutil.Sample) { if len(chunks) == 0 { - return + return []tsdbutil.Sample{} } for _, chunk := range chunks { @@ -485,6 +598,9 @@ func expandHistogramSamples(chunks []chunks.Meta) (result []tsdbutil.Sample) { case chunkenc.ValFloatHistogram: t, fh := it.AtFloatHistogram() result = append(result, fhSample{t: t, fh: fh}) + case chunkenc.ValFloat: + t, f := it.At() + result = append(result, fSample{t: t, f: f}) default: panic("unexpected value type") } diff --git a/tsdb/querier.go b/tsdb/querier.go index 854687298c..dad5595492 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -649,6 +649,10 @@ func (s *chunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return pi } +func (s *chunkSeriesEntry) ChunkCount() (int, error) { + return len(s.chks), nil +} + // populateWithDelSeriesIterator allows to iterate over samples for the single series. type populateWithDelSeriesIterator struct { populateWithDelGenericSeriesIterator