Merge pull request #513 from grafana/charleskorn/expose-chunk-count

Add `ChunkCount()` method to `ChunkSeries`
This commit is contained in:
Charles Korn 2023-07-07 09:42:45 +10:00 committed by GitHub
commit f3697f5242
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 207 additions and 13 deletions

View file

@ -415,6 +415,12 @@ type ChunkSeriesSet interface {
type ChunkSeries interface {
Labels
ChunkIterable
// ChunkCount returns the number of chunks available from this ChunkSeries.
//
// This value is used by Mimir's ingesters to report the number of chunks expected to be returned by a query,
// which is used by queriers to enforce the 'max chunks per query' limit.
ChunkCount() (int, error)
}
// Labels represents an item that has labels e.g. time series.

View file

@ -658,22 +658,42 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
if len(series) == 0 {
return nil
}
chunkIteratorFn := func(chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator(nil))
}
return &compactChunkIterator{
mergeFunc: mergeFunc,
iterators: iterators,
}
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator(nil))
}
return &compactChunkIterator{
mergeFunc: mergeFunc,
iterators: iterators,
}
Lset: series[0].Labels(),
ChunkIteratorFn: chunkIteratorFn,
ChunkCountFn: func() (int, error) {
// This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir -
// it's just here to ensure things don't break if this assumption ever changes.
// Ingesters return uncompacted chunks to queriers, so this method is never called.
return countChunks(chunkIteratorFn)
},
}
}
}
func countChunks(chunkIteratorFn func(chunks.Iterator) chunks.Iterator) (int, error) {
chunkCount := 0
it := chunkIteratorFn(nil)
for it.Next() {
chunkCount++
}
return chunkCount, it.Err()
}
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
@ -801,6 +821,7 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
if len(series) == 0 {
return nil
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
@ -812,6 +833,20 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
iterators: iterators,
}
},
ChunkCountFn: func() (int, error) {
chunkCount := 0
for _, series := range series {
c, err := series.ChunkCount()
if err != nil {
return 0, err
}
chunkCount += c
}
return chunkCount, nil
},
}
}
}

View file

@ -428,6 +428,14 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{fSample{1, 1}, fSample{2, 2}}, []tsdbutil.Sample{fSample{3, 3}, fSample{5, 5}}, []tsdbutil.Sample{fSample{7, 7}, fSample{9, 9}}, []tsdbutil.Sample{fSample{10, 10}}),
},
{
name: "two non overlapping in reverse order",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{fSample{7, 7}, fSample{9, 9}}, []tsdbutil.Sample{fSample{10, 10}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{fSample{1, 1}, fSample{2, 2}}, []tsdbutil.Sample{fSample{3, 3}, fSample{5, 5}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{fSample{1, 1}, fSample{2, 2}}, []tsdbutil.Sample{fSample{3, 3}, fSample{5, 5}}, []tsdbutil.Sample{fSample{7, 7}, fSample{9, 9}}, []tsdbutil.Sample{fSample{10, 10}}),
},
{
name: "two overlapping",
input: []ChunkSeries{
@ -570,6 +578,10 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
count, err := merged.ChunkCount()
require.NoError(t, err)
require.Len(t, actChks, count)
})
}
}
@ -704,6 +716,10 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) {
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
count, err := merged.ChunkCount()
require.NoError(t, err)
require.Equal(t, len(expChks), count)
})
}
}

View file

@ -35,11 +35,13 @@ func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return
type ChunkSeriesEntry struct {
Lset labels.Labels
ChunkCountFn func() (int, error)
ChunkIteratorFn func(chunks.Iterator) chunks.Iterator
}
func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) }
func (s *ChunkSeriesEntry) ChunkCount() (int, error) { return s.ChunkCountFn() }
// NewListSeries returns series entry with iterator that allows to iterate over provided samples.
func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
@ -78,6 +80,7 @@ func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sam
}
return NewListChunkSeriesIterator(chks...)
},
ChunkCountFn: func() (int, error) { return len(samples), nil }, // We create one chunk per slice of samples.
}
}
@ -399,6 +402,20 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
return NewListChunkSeriesIterator(chks...)
}
func (s *seriesToChunkEncoder) ChunkCount() (int, error) {
// This method is expensive, but we don't expect to ever actually use this on the ingester query path in Mimir -
// it's just here to ensure things don't break if this assumption ever changes.
chunkCount := 0
it := s.Iterator(nil)
for it.Next() {
chunkCount++
}
return chunkCount, it.Err()
}
func appendChunk(chks []chunks.Meta, mint, maxt int64, chk chunkenc.Chunk) []chunks.Meta {
if chk != nil {
chks = append(chks, chunks.Meta{

View file

@ -73,6 +73,39 @@ func TestListSeriesIterator(t *testing.T) {
require.Equal(t, chunkenc.ValNone, it.Seek(2))
}
func TestNewListChunkSeriesFromSamples(t *testing.T) {
lbls := labels.FromStrings("__name__", "the_series")
series := NewListChunkSeriesFromSamples(
lbls,
samples{
fSample{0, 0},
fSample{1, 1},
fSample{1, 1.5},
fSample{2, 2},
fSample{3, 3},
},
samples{
fSample{4, 5},
},
)
require.Equal(t, lbls, series.Labels())
it := series.Iterator(nil)
chks := []chunks.Meta{}
for it.Next() {
chks = append(chks, it.At())
}
require.NoError(t, it.Err())
require.Len(t, chks, 2)
count, err := series.ChunkCount()
require.NoError(t, err)
require.Equal(t, len(chks), count, "should have one chunk per group of samples")
}
// TestSeriesSetToChunkSet test the property of SeriesSet that says
// returned series should be iterable even after Next is called.
func TestChunkSeriesSetToSeriesSet(t *testing.T) {
@ -125,6 +158,82 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) {
}
}
func TestSeriesToChunks(t *testing.T) {
generateSamples := func(count int) []tsdbutil.Sample {
s := make([]tsdbutil.Sample, count)
for i := 0; i < count; i++ {
s[i] = fSample{t: int64(i), f: float64(i) * 10.0}
}
return s
}
h := &histogram.Histogram{
Count: 0,
ZeroThreshold: 0.001,
Schema: 0,
}
testCases := map[string]struct {
samples []tsdbutil.Sample
expectedChunkCount int
}{
"no samples": {
samples: []tsdbutil.Sample{},
expectedChunkCount: 0,
},
"single sample": {
samples: generateSamples(1),
expectedChunkCount: 1,
},
"120 samples": {
samples: generateSamples(120),
expectedChunkCount: 1,
},
"121 samples": {
samples: generateSamples(121),
expectedChunkCount: 2,
},
"240 samples": {
samples: generateSamples(240),
expectedChunkCount: 2,
},
"241 samples": {
samples: generateSamples(241),
expectedChunkCount: 3,
},
"float samples and histograms": {
samples: []tsdbutil.Sample{
fSample{t: 1, f: 10},
fSample{t: 2, f: 20},
hSample{t: 3, h: h},
fSample{t: 4, f: 40},
},
expectedChunkCount: 3,
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
lset := labels.FromStrings("__name__", "test_series")
series := NewListSeries(lset, testCase.samples)
encoder := NewSeriesToChunkEncoder(series)
require.Equal(t, lset, encoder.Labels())
chks, err := ExpandChunks(encoder.Iterator(nil))
require.NoError(t, err)
require.Len(t, chks, testCase.expectedChunkCount)
count, err := encoder.ChunkCount()
require.NoError(t, err)
require.Equal(t, testCase.expectedChunkCount, count)
encodedSamples := expandChunks(chks)
require.Equal(t, testCase.samples, encodedSamples)
})
}
}
type histogramTest struct {
samples []tsdbutil.Sample
expectedCounterResetHeaders []chunkenc.CounterResetHeader
@ -430,8 +539,12 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
require.NoError(t, err)
require.Equal(t, len(test.expectedCounterResetHeaders), len(chks))
count, err := encoder.ChunkCount()
require.NoError(t, err)
require.Len(t, chks, count)
// Decode all encoded samples and assert they are equal to the original ones.
encodedSamples := expandHistogramSamples(chks)
encodedSamples := expandChunks(chks)
require.Equal(t, len(test.samples), len(encodedSamples))
for i, s := range test.samples {
@ -470,9 +583,9 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
}
}
func expandHistogramSamples(chunks []chunks.Meta) (result []tsdbutil.Sample) {
func expandChunks(chunks []chunks.Meta) (result []tsdbutil.Sample) {
if len(chunks) == 0 {
return
return []tsdbutil.Sample{}
}
for _, chunk := range chunks {
@ -485,6 +598,9 @@ func expandHistogramSamples(chunks []chunks.Meta) (result []tsdbutil.Sample) {
case chunkenc.ValFloatHistogram:
t, fh := it.AtFloatHistogram()
result = append(result, fhSample{t: t, fh: fh})
case chunkenc.ValFloat:
t, f := it.At()
result = append(result, fSample{t: t, f: f})
default:
panic("unexpected value type")
}

View file

@ -649,6 +649,10 @@ func (s *chunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator {
return pi
}
func (s *chunkSeriesEntry) ChunkCount() (int, error) {
return len(s.chks), nil
}
// populateWithDelSeriesIterator allows to iterate over samples for the single series.
type populateWithDelSeriesIterator struct {
populateWithDelGenericSeriesIterator