diff --git a/storage/merge.go b/storage/merge.go index e979ad04a..9ff88f71c 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -523,8 +523,12 @@ func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) { } t, h := c.curr.AtHistogram() // If the current sample is not consecutive with the previous one, we - // cannot be sure anymore that there was no counter reset. - if !c.consecutive && h.CounterResetHint == histogram.NotCounterReset { + // cannot be sure anymore about counter resets for counter histograms. + // TODO(beorn7): If a `NotCounterReset` sample is followed by a + // non-consecutive `CounterReset` sample, we could keep the hint as + // `CounterReset`. But then we needed to track the previous sample + // in more detail, which might not be worth it. + if !c.consecutive && h.CounterResetHint != histogram.GaugeType { h.CounterResetHint = histogram.UnknownCounterReset } return t, h diff --git a/storage/merge_test.go b/storage/merge_test.go index d28a26449..425285237 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -384,16 +385,36 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { } } +func histogramSample(ts int64, hint histogram.CounterResetHint) hSample { + h := tsdbutil.GenerateTestHistogram(int(ts + 1)) + h.CounterResetHint = hint + return hSample{t: ts, h: h} +} + +func floatHistogramSample(ts int64, hint histogram.CounterResetHint) fhSample { + fh := tsdbutil.GenerateTestFloatHistogram(int(ts + 1)) + fh.CounterResetHint = hint + return fhSample{t: ts, fh: fh} +} + +// Shorthands for counter reset hints. +const ( + uk = histogram.UnknownCounterReset + cr = histogram.CounterReset + nr = histogram.NotCounterReset + ga = histogram.GaugeType +) + func TestCompactingChunkSeriesMerger(t *testing.T) { m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge) // histogramSample returns a histogram that is unique to the ts. histogramSample := func(ts int64) hSample { - return hSample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts + 1))} + return histogramSample(ts, uk) } floatHistogramSample := func(ts int64) fhSample { - return fhSample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts + 1))} + return floatHistogramSample(ts, uk) } for _, tc := range []struct { @@ -585,10 +606,130 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { count, err := merged.ChunkCount() require.NoError(t, err) require.Len(t, actChks, count) + actSamples := chunks.ChunkMetasToSamples(actChks) + expSamples := chunks.ChunkMetasToSamples(expChks) + require.Equal(t, expSamples, actSamples) }) } } +func TestCompactingChunkSeriesMergerHistogramCounterResetHint(t *testing.T) { + m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge) + + for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{ + "histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) }, + "float histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return floatHistogramSample(ts, hint) }, + } { + for name, tc := range map[string]struct { + input []ChunkSeries + expected ChunkSeries + }{ + "histogram counter reset hint kept in single series": { + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)}, + ), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)}, + ), + }, + "histogram not counter reset hint kept in single series": { + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)}, + ), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)}, + ), + }, + "histogram counter reset hint kept in multiple equal series": { + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)}, + ), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)}, + ), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)}, + ), + }, + "histogram not counter reset hint kept in multiple equal series": { + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)}, + ), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)}, + ), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)}, + ), + }, + "histogram counter reset hint dropped from differing series": { + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)}, + ), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, cr), sampleFunc(12, uk), sampleFunc(15, uk)}, + ), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, uk), sampleFunc(12, uk), sampleFunc(15, uk)}, + ), + }, + "histogram counter not reset hint dropped from differing series": { + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)}, + ), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, nr), sampleFunc(12, uk), sampleFunc(15, uk)}, + ), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)}, + []chunks.Sample{sampleFunc(10, uk), sampleFunc(12, uk), sampleFunc(15, uk)}, + ), + }, + } { + t.Run(sampleType+"/"+name, func(t *testing.T) { + merged := m(tc.input...) + require.Equal(t, tc.expected.Labels(), merged.Labels()) + actChks, actErr := ExpandChunks(merged.Iterator(nil)) + expChks, expErr := ExpandChunks(tc.expected.Iterator(nil)) + + require.Equal(t, expErr, actErr) + require.Equal(t, expChks, actChks) + + actSamples := chunks.ChunkMetasToSamples(actChks) + expSamples := chunks.ChunkMetasToSamples(expChks) + require.Equal(t, expSamples, actSamples) + }) + } + } +} + func TestConcatenatingChunkSeriesMerger(t *testing.T) { m := NewConcatenatingChunkSeriesMerger() @@ -820,102 +961,243 @@ func (m *mockChunkSeriesSet) Err() error { return nil } func (m *mockChunkSeriesSet) Warnings() annotations.Annotations { return nil } func TestChainSampleIterator(t *testing.T) { - for _, tc := range []struct { - input []chunkenc.Iterator - expected []chunks.Sample - }{ - { - input: []chunkenc.Iterator{ - NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}), - }, - expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}}, - }, - { - input: []chunkenc.Iterator{ - NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}), - NewListSeriesIterator(samples{fSample{2, 2}, fSample{3, 3}}), - }, - expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}}, - }, - { - input: []chunkenc.Iterator{ - NewListSeriesIterator(samples{fSample{0, 0}, fSample{3, 3}}), - NewListSeriesIterator(samples{fSample{1, 1}, fSample{4, 4}}), - NewListSeriesIterator(samples{fSample{2, 2}, fSample{5, 5}}), - }, - expected: []chunks.Sample{ - fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}, fSample{4, 4}, fSample{5, 5}, - }, - }, - // Overlap. - { - input: []chunkenc.Iterator{ - NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}), - NewListSeriesIterator(samples{fSample{0, 0}, fSample{2, 2}}), - NewListSeriesIterator(samples{fSample{2, 2}, fSample{3, 3}}), - NewListSeriesIterator(samples{}), - NewListSeriesIterator(samples{}), - NewListSeriesIterator(samples{}), - }, - expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}}, - }, + for sampleType, sampleFunc := range map[string]func(int64) chunks.Sample{ + "float": func(ts int64) chunks.Sample { return fSample{ts, float64(ts)} }, + "histogram": func(ts int64) chunks.Sample { return histogramSample(ts, uk) }, + "float histogram": func(ts int64) chunks.Sample { return floatHistogramSample(ts, uk) }, } { - merged := ChainSampleIteratorFromIterators(nil, tc.input) - actual, err := ExpandSamples(merged, nil) - require.NoError(t, err) - require.Equal(t, tc.expected, actual) + for name, tc := range map[string]struct { + input []chunkenc.Iterator + expected []chunks.Sample + }{ + "single iterator": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}), + }, + expected: []chunks.Sample{sampleFunc(0), sampleFunc(1)}, + }, + "non overlapping iterators": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}), + NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(3)}), + }, + expected: []chunks.Sample{sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3)}, + }, + "overlapping but distinct iterators": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(3)}), + NewListSeriesIterator(samples{sampleFunc(1), sampleFunc(4)}), + NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(5)}), + }, + expected: []chunks.Sample{ + sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3), sampleFunc(4), sampleFunc(5), + }, + }, + "overlapping iterators": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}), + NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(2)}), + NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(3)}), + NewListSeriesIterator(samples{}), + NewListSeriesIterator(samples{}), + NewListSeriesIterator(samples{}), + }, + expected: []chunks.Sample{sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3)}, + }, + } { + t.Run(sampleType+"/"+name, func(t *testing.T) { + merged := ChainSampleIteratorFromIterators(nil, tc.input) + actual, err := ExpandSamples(merged, nil) + require.NoError(t, err) + require.Equal(t, tc.expected, actual) + }) + } + } +} + +func TestChainSampleIteratorHistogramCounterResetHint(t *testing.T) { + for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{ + "histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) }, + "float histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return floatHistogramSample(ts, hint) }, + } { + for name, tc := range map[string]struct { + input []chunkenc.Iterator + expected []chunks.Sample + }{ + "single iterator": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, uk)}), + }, + expected: []chunks.Sample{sampleFunc(0, uk), sampleFunc(1, cr), sampleFunc(2, uk)}, + }, + "single iterator gauge": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga)}), + }, + expected: []chunks.Sample{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga)}, + }, + "overlapping iterators gauge": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga), sampleFunc(4, ga)}), + NewListSeriesIterator(samples{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(3, ga), sampleFunc(5, ga)}), + }, + expected: []chunks.Sample{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga), sampleFunc(3, ga), sampleFunc(4, ga), sampleFunc(5, ga)}, + }, + "non overlapping iterators": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, uk)}), + NewListSeriesIterator(samples{sampleFunc(2, cr), sampleFunc(3, cr)}), + }, + expected: []chunks.Sample{sampleFunc(0, uk), sampleFunc(1, uk), sampleFunc(2, uk), sampleFunc(3, cr)}, + }, + "overlapping but distinct iterators": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(3, uk), sampleFunc(5, cr)}), + NewListSeriesIterator(samples{sampleFunc(1, uk), sampleFunc(2, cr), sampleFunc(4, cr)}), + }, + expected: []chunks.Sample{ + sampleFunc(0, uk), sampleFunc(1, uk), sampleFunc(2, cr), sampleFunc(3, uk), sampleFunc(4, uk), sampleFunc(5, uk), + }, + }, + "overlapping iterators": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, cr)}), + NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, cr)}), + }, + expected: []chunks.Sample{sampleFunc(0, uk), sampleFunc(1, uk), sampleFunc(2, uk)}, + }, + } { + t.Run(sampleType+"/"+name, func(t *testing.T) { + merged := ChainSampleIteratorFromIterators(nil, tc.input) + actual, err := ExpandSamples(merged, nil) + require.NoError(t, err) + require.Equal(t, tc.expected, actual) + }) + } } } func TestChainSampleIteratorSeek(t *testing.T) { - for _, tc := range []struct { - input []chunkenc.Iterator - seek int64 - expected []chunks.Sample - }{ - { - input: []chunkenc.Iterator{ - NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}}), - }, - seek: 1, - expected: []chunks.Sample{fSample{1, 1}, fSample{2, 2}}, - }, - { - input: []chunkenc.Iterator{ - NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}), - NewListSeriesIterator(samples{fSample{2, 2}, fSample{3, 3}}), - }, - seek: 2, - expected: []chunks.Sample{fSample{2, 2}, fSample{3, 3}}, - }, - { - input: []chunkenc.Iterator{ - NewListSeriesIterator(samples{fSample{0, 0}, fSample{3, 3}}), - NewListSeriesIterator(samples{fSample{1, 1}, fSample{4, 4}}), - NewListSeriesIterator(samples{fSample{2, 2}, fSample{5, 5}}), - }, - seek: 2, - expected: []chunks.Sample{fSample{2, 2}, fSample{3, 3}, fSample{4, 4}, fSample{5, 5}}, - }, - { - input: []chunkenc.Iterator{ - NewListSeriesIterator(samples{fSample{0, 0}, fSample{2, 2}, fSample{3, 3}}), - NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}}), - }, - seek: 0, - expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}}, - }, + for sampleType, sampleFunc := range map[string]func(int64) chunks.Sample{ + "float": func(ts int64) chunks.Sample { return fSample{ts, float64(ts)} }, + "histogram": func(ts int64) chunks.Sample { return histogramSample(ts, uk) }, + "float histogram": func(ts int64) chunks.Sample { return floatHistogramSample(ts, uk) }, } { - merged := ChainSampleIteratorFromIterators(nil, tc.input) - actual := []chunks.Sample{} - if merged.Seek(tc.seek) == chunkenc.ValFloat { - t, f := merged.At() - actual = append(actual, fSample{t, f}) + for name, tc := range map[string]struct { + input []chunkenc.Iterator + seek int64 + expected []chunks.Sample + }{ + "single iterator": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1), sampleFunc(2)}), + }, + seek: 1, + expected: []chunks.Sample{sampleFunc(1), sampleFunc(2)}, + }, + "non overlapping iterators": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}), + NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(3)}), + }, + seek: 2, + expected: []chunks.Sample{sampleFunc(2), sampleFunc(3)}, + }, + "overlapping but distinct iterators": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(3)}), + NewListSeriesIterator(samples{sampleFunc(1), sampleFunc(4)}), + NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(5)}), + }, + seek: 2, + expected: []chunks.Sample{sampleFunc(2), sampleFunc(3), sampleFunc(4), sampleFunc(5)}, + }, + "overlapping iterators": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(2), sampleFunc(3)}), + NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1), sampleFunc(2)}), + }, + seek: 0, + expected: []chunks.Sample{sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3)}, + }, + } { + t.Run(sampleType+"/"+name, func(t *testing.T) { + merged := ChainSampleIteratorFromIterators(nil, tc.input) + actual := []chunks.Sample{} + switch merged.Seek(tc.seek) { + case chunkenc.ValFloat: + t, f := merged.At() + actual = append(actual, fSample{t, f}) + case chunkenc.ValHistogram: + t, h := merged.AtHistogram() + actual = append(actual, hSample{t, h}) + case chunkenc.ValFloatHistogram: + t, fh := merged.AtFloatHistogram() + actual = append(actual, fhSample{t, fh}) + } + s, err := ExpandSamples(merged, nil) + require.NoError(t, err) + actual = append(actual, s...) + require.Equal(t, tc.expected, actual) + }) + } + } +} + +func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) { + for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{ + "histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) }, + "float histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return floatHistogramSample(ts, hint) }, + } { + for name, tc := range map[string]struct { + input []chunkenc.Iterator + seek int64 + expected []chunks.Sample + }{ + "single iterator": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, uk)}), + }, + seek: 1, + expected: []chunks.Sample{sampleFunc(1, uk), sampleFunc(2, uk)}, + }, + "non overlapping iterators": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, uk)}), + NewListSeriesIterator(samples{sampleFunc(2, cr), sampleFunc(3, cr)}), + }, + seek: 2, + expected: []chunks.Sample{sampleFunc(2, uk), sampleFunc(3, cr)}, + }, + "non overlapping iterators seek to internal reset": { + input: []chunkenc.Iterator{ + NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, uk)}), + NewListSeriesIterator(samples{sampleFunc(2, cr), sampleFunc(3, cr)}), + }, + seek: 3, + expected: []chunks.Sample{sampleFunc(3, uk)}, + }, + } { + t.Run(sampleType+"/"+name, func(t *testing.T) { + merged := ChainSampleIteratorFromIterators(nil, tc.input) + actual := []chunks.Sample{} + switch merged.Seek(tc.seek) { + case chunkenc.ValFloat: + t, f := merged.At() + actual = append(actual, fSample{t, f}) + case chunkenc.ValHistogram: + t, h := merged.AtHistogram() + actual = append(actual, hSample{t, h}) + case chunkenc.ValFloatHistogram: + t, fh := merged.AtFloatHistogram() + actual = append(actual, fhSample{t, fh}) + } + s, err := ExpandSamples(merged, nil) + require.NoError(t, err) + actual = append(actual, s...) + require.Equal(t, tc.expected, actual) + }) } - s, err := ExpandSamples(merged, nil) - require.NoError(t, err) - actual = append(actual, s...) - require.Equal(t, tc.expected, actual) } } diff --git a/storage/series_test.go b/storage/series_test.go index 24b6d26ae..163a47ec9 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -467,35 +467,36 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { require.Len(t, chks, count) // Decode all encoded samples and assert they are equal to the original ones. - encodedSamples := expandHistogramSamples(chks) + encodedSamples := chunks.ChunkMetasToSamples(chks) require.Equal(t, len(test.samples), len(encodedSamples)) for i, s := range test.samples { + encodedSample := encodedSamples[i] switch expectedSample := s.(type) { case hSample: - encodedSample, ok := encodedSamples[i].(hSample) - require.True(t, ok, "expect histogram", fmt.Sprintf("at idx %d", i)) + require.Equal(t, chunkenc.ValHistogram, encodedSample.Type(), "expect histogram", fmt.Sprintf("at idx %d", i)) + h := encodedSample.H() // Ignore counter reset if not gauge here, will check on chunk level. if expectedSample.h.CounterResetHint != histogram.GaugeType { - encodedSample.h.CounterResetHint = histogram.UnknownCounterReset + h.CounterResetHint = histogram.UnknownCounterReset } if value.IsStaleNaN(expectedSample.h.Sum) { - require.True(t, value.IsStaleNaN(encodedSample.h.Sum), fmt.Sprintf("at idx %d", i)) + require.True(t, value.IsStaleNaN(h.Sum), fmt.Sprintf("at idx %d", i)) continue } - require.Equal(t, *expectedSample.h, *encodedSample.h.Compact(0), fmt.Sprintf("at idx %d", i)) + require.Equal(t, *expectedSample.h, *h.Compact(0), fmt.Sprintf("at idx %d", i)) case fhSample: - encodedSample, ok := encodedSamples[i].(fhSample) - require.True(t, ok, "expect float histogram", fmt.Sprintf("at idx %d", i)) + require.Equal(t, chunkenc.ValFloatHistogram, encodedSample.Type(), "expect float histogram", fmt.Sprintf("at idx %d", i)) + fh := encodedSample.FH() // Ignore counter reset if not gauge here, will check on chunk level. if expectedSample.fh.CounterResetHint != histogram.GaugeType { - encodedSample.fh.CounterResetHint = histogram.UnknownCounterReset + fh.CounterResetHint = histogram.UnknownCounterReset } if value.IsStaleNaN(expectedSample.fh.Sum) { - require.True(t, value.IsStaleNaN(encodedSample.fh.Sum), fmt.Sprintf("at idx %d", i)) + require.True(t, value.IsStaleNaN(fh.Sum), fmt.Sprintf("at idx %d", i)) continue } - require.Equal(t, *expectedSample.fh, *encodedSample.fh.Compact(0), fmt.Sprintf("at idx %d", i)) + require.Equal(t, *expectedSample.fh, *fh.Compact(0), fmt.Sprintf("at idx %d", i)) default: t.Error("internal error, unexpected type") } @@ -506,29 +507,6 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { } } -func expandHistogramSamples(chunks []chunks.Meta) (result []chunks.Sample) { - if len(chunks) == 0 { - return - } - - for _, chunk := range chunks { - it := chunk.Chunk.Iterator(nil) - for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() { - switch vt { - case chunkenc.ValHistogram: - t, h := it.AtHistogram() - result = append(result, hSample{t: t, h: h}) - case chunkenc.ValFloatHistogram: - t, fh := it.AtFloatHistogram() - result = append(result, fhSample{t: t, fh: fh}) - default: - panic("unexpected value type") - } - } - } - return -} - func getCounterResetHint(chunk chunks.Meta) chunkenc.CounterResetHeader { switch chk := chunk.Chunk.(type) { case *chunkenc.HistogramChunk: diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index ad08a11b9..505d11245 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -339,11 +339,16 @@ func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, o return false } - oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices. - oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span. - oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset + var ( + oldSpanSliceIdx, newSpanSliceIdx int = -1, -1 // Index for the span slices. Starts at -1 to indicate that the first non empty span is not yet found. + oldInsideSpanIdx, newInsideSpanIdx uint32 // Index inside a span. + oldIdx, newIdx int32 // Index inside a bucket slice. + oldBucketSliceIdx, newBucketSliceIdx int // Index inside bucket slice. + ) - oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice. + // Find first non empty spans. + oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans) + newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans) oldVal, newVal := oldBuckets[0].value, newBuckets[0] // Since we assume that new spans won't have missing buckets, there will never be a case @@ -359,13 +364,12 @@ func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, o // Moving ahead old bucket and span by 1 index. if oldInsideSpanIdx+1 >= oldSpans[oldSpanSliceIdx].Length { // Current span is over. - oldSpanSliceIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldSpans) + oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans) oldInsideSpanIdx = 0 if oldSpanSliceIdx >= len(oldSpans) { // All old spans are over. break } - oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset } else { oldInsideSpanIdx++ oldIdx++ @@ -378,14 +382,13 @@ func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, o // Moving ahead new bucket and span by 1 index. if newInsideSpanIdx+1 >= newSpans[newSpanSliceIdx].Length { // Current span is over. - newSpanSliceIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newSpans) + newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans) newInsideSpanIdx = 0 if newSpanSliceIdx >= len(newSpans) { // All new spans are over. // This should not happen, old spans above should catch this first. panic("new spans over before old spans in counterReset") } - newIdx += 1 + newSpans[newSpanSliceIdx].Offset } else { newInsideSpanIdx++ newIdx++ diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index db1b99fb3..7629ae4df 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -550,62 +550,193 @@ func assertRecodedFloatHistogramChunkOnAppend(t *testing.T, prevChunk Chunk, hAp } func TestFloatHistogramChunkAppendableWithEmptySpan(t *testing.T) { - h1 := &histogram.FloatHistogram{ - Schema: 0, - Count: 21, - Sum: 1234.5, - ZeroThreshold: 0.001, - ZeroCount: 4, - PositiveSpans: []histogram.Span{ - {Offset: 0, Length: 4}, - {Offset: 0, Length: 0}, - {Offset: 0, Length: 3}, + tests := map[string]struct { + h1 *histogram.FloatHistogram + h2 *histogram.FloatHistogram + }{ + "empty span in old and new histogram": { + h1: &histogram.FloatHistogram{ + Schema: 0, + Count: 21, + Sum: 1234.5, + ZeroThreshold: 0.001, + ZeroCount: 4, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []float64{1, 2, 1, 1, 1, 1, 1}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2}, + }, + h2: &histogram.FloatHistogram{ + Schema: 0, + Count: 37, + Sum: 2345.6, + ZeroThreshold: 0.001, + ZeroCount: 5, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2}, + }, }, - PositiveBuckets: []float64{1, 2, 1, 1, 1, 1, 1}, - NegativeSpans: []histogram.Span{ - {Offset: 1, Length: 4}, - {Offset: 2, Length: 0}, - {Offset: 2, Length: 3}, + "empty span in old histogram": { + h1: &histogram.FloatHistogram{ + Schema: 0, + Count: 21, + Sum: 1234.5, + ZeroThreshold: 0.001, + ZeroCount: 4, + PositiveSpans: []histogram.Span{ + {Offset: 1, Length: 0}, // This span will disappear. + {Offset: 2, Length: 4}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []float64{1, 2, 1, 1, 1, 1, 1}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2}, + }, + h2: &histogram.FloatHistogram{ + Schema: 0, + Count: 37, + Sum: 2345.6, + ZeroThreshold: 0.001, + ZeroCount: 5, + PositiveSpans: []histogram.Span{ + {Offset: 3, Length: 4}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2}, + }, }, - NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2}, - } - h2 := &histogram.FloatHistogram{ - Schema: 0, - Count: 37, - Sum: 2345.6, - ZeroThreshold: 0.001, - ZeroCount: 5, - PositiveSpans: []histogram.Span{ - {Offset: 0, Length: 4}, - {Offset: 0, Length: 0}, - {Offset: 0, Length: 3}, + "empty span in new histogram": { + h1: &histogram.FloatHistogram{ + Schema: 0, + Count: 21, + Sum: 1234.5, + ZeroThreshold: 0.001, + ZeroCount: 4, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 3, Length: 3}, + }, + PositiveBuckets: []float64{1, 2, 1, 1, 1, 1, 1}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2}, + }, + h2: &histogram.FloatHistogram{ + Schema: 0, + Count: 37, + Sum: 2345.6, + ZeroThreshold: 0.001, + ZeroCount: 5, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 1, Length: 0}, // This span is new. + {Offset: 2, Length: 3}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2}, + }, }, - PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1}, - NegativeSpans: []histogram.Span{ - {Offset: 1, Length: 4}, - {Offset: 2, Length: 0}, - {Offset: 2, Length: 3}, + "two empty spans mixing offsets": { + h1: &histogram.FloatHistogram{ + Schema: 0, + Count: 21, + Sum: 1234.5, + ZeroThreshold: 0.001, + ZeroCount: 4, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 1, Length: 0}, + {Offset: 3, Length: 0}, + {Offset: 4, Length: 3}, + }, + PositiveBuckets: []float64{1, 2, 1, 1, 1, 1, 1}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2}, + }, + h2: &histogram.FloatHistogram{ + Schema: 0, + Count: 37, + Sum: 2345.6, + ZeroThreshold: 0.001, + ZeroCount: 5, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 3, Length: 0}, + {Offset: 1, Length: 0}, + {Offset: 4, Length: 3}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2}, + }, }, - NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2}, } - c := Chunk(NewFloatHistogramChunk()) + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + c := Chunk(NewFloatHistogramChunk()) - // Create fresh appender and add the first histogram. - app, err := c.Appender() - require.NoError(t, err) - require.Equal(t, 0, c.NumSamples()) + // Create fresh appender and add the first histogram. + app, err := c.Appender() + require.NoError(t, err) + require.Equal(t, 0, c.NumSamples()) - _, _, _, err = app.AppendFloatHistogram(nil, 1, h1, true) - require.NoError(t, err) - require.Equal(t, 1, c.NumSamples()) - hApp, _ := app.(*FloatHistogramAppender) + _, _, _, err = app.AppendFloatHistogram(nil, 1, tc.h1, true) + require.NoError(t, err) + require.Equal(t, 1, c.NumSamples()) + hApp, _ := app.(*FloatHistogramAppender) - pI, nI, okToAppend, counterReset := hApp.appendable(h2) - require.Empty(t, pI) - require.Empty(t, nI) - require.True(t, okToAppend) - require.False(t, counterReset) + pI, nI, okToAppend, counterReset := hApp.appendable(tc.h2) + require.Empty(t, pI) + require.Empty(t, nI) + require.True(t, okToAppend) + require.False(t, counterReset) + }) + } } func TestFloatHistogramChunkAppendableGauge(t *testing.T) { diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index cc2680be2..847d89376 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -359,11 +359,16 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans return false } - oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices. - oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span. - oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset + var ( + oldSpanSliceIdx, newSpanSliceIdx int = -1, -1 // Index for the span slices. Starts at -1 to indicate that the first non empty span is not yet found. + oldInsideSpanIdx, newInsideSpanIdx uint32 // Index inside a span. + oldIdx, newIdx int32 // Index inside a bucket slice. + oldBucketSliceIdx, newBucketSliceIdx int // Index inside bucket slice. + ) - oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice. + // Find first non empty spans. + oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans) + newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans) oldVal, newVal := oldBuckets[0], newBuckets[0] // Since we assume that new spans won't have missing buckets, there will never be a case @@ -379,13 +384,12 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans // Moving ahead old bucket and span by 1 index. if oldInsideSpanIdx+1 >= oldSpans[oldSpanSliceIdx].Length { // Current span is over. - oldSpanSliceIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldSpans) + oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans) oldInsideSpanIdx = 0 if oldSpanSliceIdx >= len(oldSpans) { // All old spans are over. break } - oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset } else { oldInsideSpanIdx++ oldIdx++ @@ -398,14 +402,13 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans // Moving ahead new bucket and span by 1 index. if newInsideSpanIdx+1 >= newSpans[newSpanSliceIdx].Length { // Current span is over. - newSpanSliceIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newSpans) + newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans) newInsideSpanIdx = 0 if newSpanSliceIdx >= len(newSpans) { // All new spans are over. // This should not happen, old spans above should catch this first. panic("new spans over before old spans in counterReset") } - newIdx += 1 + newSpans[newSpanSliceIdx].Offset } else { newInsideSpanIdx++ newIdx++ diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index 3cc64fa98..cda1080a8 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -489,8 +489,13 @@ func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterR } // Handle pathological case of empty span when advancing span idx. -func nextNonEmptySpanSliceIdx(idx int, spans []histogram.Span) (newIdx int) { - for idx++; idx < len(spans) && spans[idx].Length == 0; idx++ { //nolint:revive // This "empty" block is intentional +// Call it with idx==-1 to find the first non empty span. +func nextNonEmptySpanSliceIdx(idx int, bucketIdx int32, spans []histogram.Span) (newIdx int, newBucketIdx int32) { + for idx++; idx < len(spans); idx++ { + if spans[idx].Length > 0 { + return idx, bucketIdx + spans[idx].Offset + 1 + } + bucketIdx += spans[idx].Offset } - return idx + return idx, 0 } diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 5cd532833..b983d7fe6 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -573,62 +573,193 @@ func assertSampleCount(t *testing.T, c Chunk, exp int64, vtype ValueType) { } func TestHistogramChunkAppendableWithEmptySpan(t *testing.T) { - h1 := &histogram.Histogram{ - Schema: 0, - Count: 21, - Sum: 1234.5, - ZeroThreshold: 0.001, - ZeroCount: 4, - PositiveSpans: []histogram.Span{ - {Offset: 0, Length: 4}, - {Offset: 0, Length: 0}, - {Offset: 0, Length: 3}, + tests := map[string]struct { + h1 *histogram.Histogram + h2 *histogram.Histogram + }{ + "empty span in old and new histogram": { + h1: &histogram.Histogram{ + Schema: 0, + Count: 21, + Sum: 1234.5, + ZeroThreshold: 0.001, + ZeroCount: 4, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []int64{1, 1, -1, 0, 0, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []int64{1, 1, -1, 1, 0, 0, 0}, + }, + h2: &histogram.Histogram{ + Schema: 0, + Count: 37, + Sum: 2345.6, + ZeroThreshold: 0.001, + ZeroCount: 5, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3}, + }, }, - PositiveBuckets: []int64{1, 1, -1, 0, 0, 0, 0}, - NegativeSpans: []histogram.Span{ - {Offset: 1, Length: 4}, - {Offset: 2, Length: 0}, - {Offset: 2, Length: 3}, + "empty span in old histogram": { + h1: &histogram.Histogram{ + Schema: 0, + Count: 21, + Sum: 1234.5, + ZeroThreshold: 0.001, + ZeroCount: 4, + PositiveSpans: []histogram.Span{ + {Offset: 1, Length: 0}, // This span will disappear. + {Offset: 2, Length: 4}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []int64{1, 1, -1, 0, 0, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []int64{1, 1, -1, 1, 0, 0, 0}, + }, + h2: &histogram.Histogram{ + Schema: 0, + Count: 37, + Sum: 2345.6, + ZeroThreshold: 0.001, + ZeroCount: 5, + PositiveSpans: []histogram.Span{ + {Offset: 3, Length: 4}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3}, + }, }, - NegativeBuckets: []int64{1, 1, -1, 1, 0, 0, 0}, - } - h2 := &histogram.Histogram{ - Schema: 0, - Count: 37, - Sum: 2345.6, - ZeroThreshold: 0.001, - ZeroCount: 5, - PositiveSpans: []histogram.Span{ - {Offset: 0, Length: 4}, - {Offset: 0, Length: 0}, - {Offset: 0, Length: 3}, + "empty span in new histogram": { + h1: &histogram.Histogram{ + Schema: 0, + Count: 21, + Sum: 1234.5, + ZeroThreshold: 0.001, + ZeroCount: 4, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 3, Length: 3}, + }, + PositiveBuckets: []int64{1, 1, -1, 0, 0, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []int64{1, 1, -1, 1, 0, 0, 0}, + }, + h2: &histogram.Histogram{ + Schema: 0, + Count: 37, + Sum: 2345.6, + ZeroThreshold: 0.001, + ZeroCount: 5, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 1, Length: 0}, // This span is new. + {Offset: 2, Length: 3}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3}, + }, }, - PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, - NegativeSpans: []histogram.Span{ - {Offset: 1, Length: 4}, - {Offset: 2, Length: 0}, - {Offset: 2, Length: 3}, + "two empty spans mixing offsets": { + h1: &histogram.Histogram{ + Schema: 0, + Count: 21, + Sum: 1234.5, + ZeroThreshold: 0.001, + ZeroCount: 4, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 1, Length: 0}, + {Offset: 3, Length: 0}, + {Offset: 4, Length: 3}, + }, + PositiveBuckets: []int64{1, 1, -1, 0, 0, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []int64{1, 1, -1, 1, 0, 0, 0}, + }, + h2: &histogram.Histogram{ + Schema: 0, + Count: 37, + Sum: 2345.6, + ZeroThreshold: 0.001, + ZeroCount: 5, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 3, Length: 0}, + {Offset: 1, Length: 0}, + {Offset: 4, Length: 3}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 4}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3}, + }, }, - NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3}, } - c := Chunk(NewHistogramChunk()) + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + c := Chunk(NewHistogramChunk()) - // Create fresh appender and add the first histogram. - app, err := c.Appender() - require.NoError(t, err) - require.Equal(t, 0, c.NumSamples()) + // Create fresh appender and add the first histogram. + app, err := c.Appender() + require.NoError(t, err) + require.Equal(t, 0, c.NumSamples()) - _, _, _, err = app.AppendHistogram(nil, 1, h1, true) - require.NoError(t, err) - require.Equal(t, 1, c.NumSamples()) - hApp, _ := app.(*HistogramAppender) + _, _, _, err = app.AppendHistogram(nil, 1, tc.h1, true) + require.NoError(t, err) + require.Equal(t, 1, c.NumSamples()) + hApp, _ := app.(*HistogramAppender) - pI, nI, okToAppend, counterReset := hApp.appendable(h2) - require.Empty(t, pI) - require.Empty(t, nI) - require.True(t, okToAppend) - require.False(t, counterReset) + pI, nI, okToAppend, counterReset := hApp.appendable(tc.h2) + require.Empty(t, pI) + require.Empty(t, nI) + require.True(t, okToAppend) + require.False(t, counterReset) + }) + } } func TestAtFloatHistogram(t *testing.T) { diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 88fc5924b..05fd24a06 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -207,6 +207,34 @@ func PopulatedChunk(numSamples int, minTime int64) (Meta, error) { return ChunkFromSamples(samples) } +// ChunkMetasToSamples converts a slice of chunk meta data to a slice of samples. +// Used in tests to compare the content of chunks. +func ChunkMetasToSamples(chunks []Meta) (result []Sample) { + if len(chunks) == 0 { + return + } + + for _, chunk := range chunks { + it := chunk.Chunk.Iterator(nil) + for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() { + switch vt { + case chunkenc.ValFloat: + t, v := it.At() + result = append(result, sample{t: t, f: v}) + case chunkenc.ValHistogram: + t, h := it.AtHistogram() + result = append(result, sample{t: t, h: h}) + case chunkenc.ValFloatHistogram: + t, fh := it.AtFloatHistogram() + result = append(result, sample{t: t, fh: fh}) + default: + panic("unexpected value type") + } + } + } + return +} + // Iterator iterates over the chunks of a single time series. type Iterator interface { // At returns the current meta.