Merge remote-tracking branch 'upstream/main' into krajo/fork-sync

# Conflicts:
#	storage/merge_test.go
#	tsdb/chunkenc/histogram_meta.go
This commit is contained in:
György Krajcsovits 2023-09-19 17:53:25 +02:00
commit c9e513f5a3
9 changed files with 805 additions and 240 deletions

View file

@ -523,8 +523,12 @@ func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) {
}
t, h := c.curr.AtHistogram()
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore that there was no counter reset.
if !c.consecutive && h.CounterResetHint == histogram.NotCounterReset {
// cannot be sure anymore about counter resets for counter histograms.
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
// non-consecutive `CounterReset` sample, we could keep the hint as
// `CounterReset`. But then we needed to track the previous sample
// in more detail, which might not be worth it.
if !c.consecutive && h.CounterResetHint != histogram.GaugeType {
h.CounterResetHint = histogram.UnknownCounterReset
}
return t, h

View file

@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -384,16 +385,36 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
}
}
func histogramSample(ts int64, hint histogram.CounterResetHint) hSample {
h := tsdbutil.GenerateTestHistogram(int(ts + 1))
h.CounterResetHint = hint
return hSample{t: ts, h: h}
}
func floatHistogramSample(ts int64, hint histogram.CounterResetHint) fhSample {
fh := tsdbutil.GenerateTestFloatHistogram(int(ts + 1))
fh.CounterResetHint = hint
return fhSample{t: ts, fh: fh}
}
// Shorthands for counter reset hints.
const (
uk = histogram.UnknownCounterReset
cr = histogram.CounterReset
nr = histogram.NotCounterReset
ga = histogram.GaugeType
)
func TestCompactingChunkSeriesMerger(t *testing.T) {
m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge)
// histogramSample returns a histogram that is unique to the ts.
histogramSample := func(ts int64) hSample {
return hSample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts + 1))}
return histogramSample(ts, uk)
}
floatHistogramSample := func(ts int64) fhSample {
return fhSample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts + 1))}
return floatHistogramSample(ts, uk)
}
for _, tc := range []struct {
@ -585,10 +606,130 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
count, err := merged.ChunkCount()
require.NoError(t, err)
require.Len(t, actChks, count)
actSamples := chunks.ChunkMetasToSamples(actChks)
expSamples := chunks.ChunkMetasToSamples(expChks)
require.Equal(t, expSamples, actSamples)
})
}
}
func TestCompactingChunkSeriesMergerHistogramCounterResetHint(t *testing.T) {
m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge)
for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{
"histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) },
"float histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return floatHistogramSample(ts, hint) },
} {
for name, tc := range map[string]struct {
input []ChunkSeries
expected ChunkSeries
}{
"histogram counter reset hint kept in single series": {
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
),
},
"histogram not counter reset hint kept in single series": {
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
),
},
"histogram counter reset hint kept in multiple equal series": {
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
),
},
"histogram not counter reset hint kept in multiple equal series": {
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
),
},
"histogram counter reset hint dropped from differing series": {
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(12, uk), sampleFunc(15, uk)},
),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, uk), sampleFunc(12, uk), sampleFunc(15, uk)},
),
},
"histogram counter not reset hint dropped from differing series": {
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(12, uk), sampleFunc(15, uk)},
),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
[]chunks.Sample{sampleFunc(10, uk), sampleFunc(12, uk), sampleFunc(15, uk)},
),
},
} {
t.Run(sampleType+"/"+name, func(t *testing.T) {
merged := m(tc.input...)
require.Equal(t, tc.expected.Labels(), merged.Labels())
actChks, actErr := ExpandChunks(merged.Iterator(nil))
expChks, expErr := ExpandChunks(tc.expected.Iterator(nil))
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
actSamples := chunks.ChunkMetasToSamples(actChks)
expSamples := chunks.ChunkMetasToSamples(expChks)
require.Equal(t, expSamples, actSamples)
})
}
}
}
func TestConcatenatingChunkSeriesMerger(t *testing.T) {
m := NewConcatenatingChunkSeriesMerger()
@ -820,102 +961,243 @@ func (m *mockChunkSeriesSet) Err() error { return nil }
func (m *mockChunkSeriesSet) Warnings() annotations.Annotations { return nil }
func TestChainSampleIterator(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
expected []chunks.Sample
}{
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}),
},
expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}),
NewListSeriesIterator(samples{fSample{2, 2}, fSample{3, 3}}),
},
expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{fSample{0, 0}, fSample{3, 3}}),
NewListSeriesIterator(samples{fSample{1, 1}, fSample{4, 4}}),
NewListSeriesIterator(samples{fSample{2, 2}, fSample{5, 5}}),
},
expected: []chunks.Sample{
fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}, fSample{4, 4}, fSample{5, 5},
},
},
// Overlap.
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}),
NewListSeriesIterator(samples{fSample{0, 0}, fSample{2, 2}}),
NewListSeriesIterator(samples{fSample{2, 2}, fSample{3, 3}}),
NewListSeriesIterator(samples{}),
NewListSeriesIterator(samples{}),
NewListSeriesIterator(samples{}),
},
expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}},
},
for sampleType, sampleFunc := range map[string]func(int64) chunks.Sample{
"float": func(ts int64) chunks.Sample { return fSample{ts, float64(ts)} },
"histogram": func(ts int64) chunks.Sample { return histogramSample(ts, uk) },
"float histogram": func(ts int64) chunks.Sample { return floatHistogramSample(ts, uk) },
} {
merged := ChainSampleIteratorFromIterators(nil, tc.input)
actual, err := ExpandSamples(merged, nil)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
for name, tc := range map[string]struct {
input []chunkenc.Iterator
expected []chunks.Sample
}{
"single iterator": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}),
},
expected: []chunks.Sample{sampleFunc(0), sampleFunc(1)},
},
"non overlapping iterators": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}),
NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(3)}),
},
expected: []chunks.Sample{sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3)},
},
"overlapping but distinct iterators": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(3)}),
NewListSeriesIterator(samples{sampleFunc(1), sampleFunc(4)}),
NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(5)}),
},
expected: []chunks.Sample{
sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3), sampleFunc(4), sampleFunc(5),
},
},
"overlapping iterators": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}),
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(2)}),
NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(3)}),
NewListSeriesIterator(samples{}),
NewListSeriesIterator(samples{}),
NewListSeriesIterator(samples{}),
},
expected: []chunks.Sample{sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3)},
},
} {
t.Run(sampleType+"/"+name, func(t *testing.T) {
merged := ChainSampleIteratorFromIterators(nil, tc.input)
actual, err := ExpandSamples(merged, nil)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
})
}
}
}
func TestChainSampleIteratorHistogramCounterResetHint(t *testing.T) {
for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{
"histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) },
"float histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return floatHistogramSample(ts, hint) },
} {
for name, tc := range map[string]struct {
input []chunkenc.Iterator
expected []chunks.Sample
}{
"single iterator": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, uk)}),
},
expected: []chunks.Sample{sampleFunc(0, uk), sampleFunc(1, cr), sampleFunc(2, uk)},
},
"single iterator gauge": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga)}),
},
expected: []chunks.Sample{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga)},
},
"overlapping iterators gauge": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga), sampleFunc(4, ga)}),
NewListSeriesIterator(samples{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(3, ga), sampleFunc(5, ga)}),
},
expected: []chunks.Sample{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga), sampleFunc(3, ga), sampleFunc(4, ga), sampleFunc(5, ga)},
},
"non overlapping iterators": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, uk)}),
NewListSeriesIterator(samples{sampleFunc(2, cr), sampleFunc(3, cr)}),
},
expected: []chunks.Sample{sampleFunc(0, uk), sampleFunc(1, uk), sampleFunc(2, uk), sampleFunc(3, cr)},
},
"overlapping but distinct iterators": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(3, uk), sampleFunc(5, cr)}),
NewListSeriesIterator(samples{sampleFunc(1, uk), sampleFunc(2, cr), sampleFunc(4, cr)}),
},
expected: []chunks.Sample{
sampleFunc(0, uk), sampleFunc(1, uk), sampleFunc(2, cr), sampleFunc(3, uk), sampleFunc(4, uk), sampleFunc(5, uk),
},
},
"overlapping iterators": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, cr)}),
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, cr)}),
},
expected: []chunks.Sample{sampleFunc(0, uk), sampleFunc(1, uk), sampleFunc(2, uk)},
},
} {
t.Run(sampleType+"/"+name, func(t *testing.T) {
merged := ChainSampleIteratorFromIterators(nil, tc.input)
actual, err := ExpandSamples(merged, nil)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
})
}
}
}
func TestChainSampleIteratorSeek(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
seek int64
expected []chunks.Sample
}{
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}}),
},
seek: 1,
expected: []chunks.Sample{fSample{1, 1}, fSample{2, 2}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}),
NewListSeriesIterator(samples{fSample{2, 2}, fSample{3, 3}}),
},
seek: 2,
expected: []chunks.Sample{fSample{2, 2}, fSample{3, 3}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{fSample{0, 0}, fSample{3, 3}}),
NewListSeriesIterator(samples{fSample{1, 1}, fSample{4, 4}}),
NewListSeriesIterator(samples{fSample{2, 2}, fSample{5, 5}}),
},
seek: 2,
expected: []chunks.Sample{fSample{2, 2}, fSample{3, 3}, fSample{4, 4}, fSample{5, 5}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{fSample{0, 0}, fSample{2, 2}, fSample{3, 3}}),
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}}),
},
seek: 0,
expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}},
},
for sampleType, sampleFunc := range map[string]func(int64) chunks.Sample{
"float": func(ts int64) chunks.Sample { return fSample{ts, float64(ts)} },
"histogram": func(ts int64) chunks.Sample { return histogramSample(ts, uk) },
"float histogram": func(ts int64) chunks.Sample { return floatHistogramSample(ts, uk) },
} {
merged := ChainSampleIteratorFromIterators(nil, tc.input)
actual := []chunks.Sample{}
if merged.Seek(tc.seek) == chunkenc.ValFloat {
t, f := merged.At()
actual = append(actual, fSample{t, f})
for name, tc := range map[string]struct {
input []chunkenc.Iterator
seek int64
expected []chunks.Sample
}{
"single iterator": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1), sampleFunc(2)}),
},
seek: 1,
expected: []chunks.Sample{sampleFunc(1), sampleFunc(2)},
},
"non overlapping iterators": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}),
NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(3)}),
},
seek: 2,
expected: []chunks.Sample{sampleFunc(2), sampleFunc(3)},
},
"overlapping but distinct iterators": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(3)}),
NewListSeriesIterator(samples{sampleFunc(1), sampleFunc(4)}),
NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(5)}),
},
seek: 2,
expected: []chunks.Sample{sampleFunc(2), sampleFunc(3), sampleFunc(4), sampleFunc(5)},
},
"overlapping iterators": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(2), sampleFunc(3)}),
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1), sampleFunc(2)}),
},
seek: 0,
expected: []chunks.Sample{sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3)},
},
} {
t.Run(sampleType+"/"+name, func(t *testing.T) {
merged := ChainSampleIteratorFromIterators(nil, tc.input)
actual := []chunks.Sample{}
switch merged.Seek(tc.seek) {
case chunkenc.ValFloat:
t, f := merged.At()
actual = append(actual, fSample{t, f})
case chunkenc.ValHistogram:
t, h := merged.AtHistogram()
actual = append(actual, hSample{t, h})
case chunkenc.ValFloatHistogram:
t, fh := merged.AtFloatHistogram()
actual = append(actual, fhSample{t, fh})
}
s, err := ExpandSamples(merged, nil)
require.NoError(t, err)
actual = append(actual, s...)
require.Equal(t, tc.expected, actual)
})
}
}
}
func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) {
for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{
"histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) },
"float histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return floatHistogramSample(ts, hint) },
} {
for name, tc := range map[string]struct {
input []chunkenc.Iterator
seek int64
expected []chunks.Sample
}{
"single iterator": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, uk)}),
},
seek: 1,
expected: []chunks.Sample{sampleFunc(1, uk), sampleFunc(2, uk)},
},
"non overlapping iterators": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, uk)}),
NewListSeriesIterator(samples{sampleFunc(2, cr), sampleFunc(3, cr)}),
},
seek: 2,
expected: []chunks.Sample{sampleFunc(2, uk), sampleFunc(3, cr)},
},
"non overlapping iterators seek to internal reset": {
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, uk)}),
NewListSeriesIterator(samples{sampleFunc(2, cr), sampleFunc(3, cr)}),
},
seek: 3,
expected: []chunks.Sample{sampleFunc(3, uk)},
},
} {
t.Run(sampleType+"/"+name, func(t *testing.T) {
merged := ChainSampleIteratorFromIterators(nil, tc.input)
actual := []chunks.Sample{}
switch merged.Seek(tc.seek) {
case chunkenc.ValFloat:
t, f := merged.At()
actual = append(actual, fSample{t, f})
case chunkenc.ValHistogram:
t, h := merged.AtHistogram()
actual = append(actual, hSample{t, h})
case chunkenc.ValFloatHistogram:
t, fh := merged.AtFloatHistogram()
actual = append(actual, fhSample{t, fh})
}
s, err := ExpandSamples(merged, nil)
require.NoError(t, err)
actual = append(actual, s...)
require.Equal(t, tc.expected, actual)
})
}
s, err := ExpandSamples(merged, nil)
require.NoError(t, err)
actual = append(actual, s...)
require.Equal(t, tc.expected, actual)
}
}

View file

@ -467,35 +467,36 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
require.Len(t, chks, count)
// Decode all encoded samples and assert they are equal to the original ones.
encodedSamples := expandHistogramSamples(chks)
encodedSamples := chunks.ChunkMetasToSamples(chks)
require.Equal(t, len(test.samples), len(encodedSamples))
for i, s := range test.samples {
encodedSample := encodedSamples[i]
switch expectedSample := s.(type) {
case hSample:
encodedSample, ok := encodedSamples[i].(hSample)
require.True(t, ok, "expect histogram", fmt.Sprintf("at idx %d", i))
require.Equal(t, chunkenc.ValHistogram, encodedSample.Type(), "expect histogram", fmt.Sprintf("at idx %d", i))
h := encodedSample.H()
// Ignore counter reset if not gauge here, will check on chunk level.
if expectedSample.h.CounterResetHint != histogram.GaugeType {
encodedSample.h.CounterResetHint = histogram.UnknownCounterReset
h.CounterResetHint = histogram.UnknownCounterReset
}
if value.IsStaleNaN(expectedSample.h.Sum) {
require.True(t, value.IsStaleNaN(encodedSample.h.Sum), fmt.Sprintf("at idx %d", i))
require.True(t, value.IsStaleNaN(h.Sum), fmt.Sprintf("at idx %d", i))
continue
}
require.Equal(t, *expectedSample.h, *encodedSample.h.Compact(0), fmt.Sprintf("at idx %d", i))
require.Equal(t, *expectedSample.h, *h.Compact(0), fmt.Sprintf("at idx %d", i))
case fhSample:
encodedSample, ok := encodedSamples[i].(fhSample)
require.True(t, ok, "expect float histogram", fmt.Sprintf("at idx %d", i))
require.Equal(t, chunkenc.ValFloatHistogram, encodedSample.Type(), "expect float histogram", fmt.Sprintf("at idx %d", i))
fh := encodedSample.FH()
// Ignore counter reset if not gauge here, will check on chunk level.
if expectedSample.fh.CounterResetHint != histogram.GaugeType {
encodedSample.fh.CounterResetHint = histogram.UnknownCounterReset
fh.CounterResetHint = histogram.UnknownCounterReset
}
if value.IsStaleNaN(expectedSample.fh.Sum) {
require.True(t, value.IsStaleNaN(encodedSample.fh.Sum), fmt.Sprintf("at idx %d", i))
require.True(t, value.IsStaleNaN(fh.Sum), fmt.Sprintf("at idx %d", i))
continue
}
require.Equal(t, *expectedSample.fh, *encodedSample.fh.Compact(0), fmt.Sprintf("at idx %d", i))
require.Equal(t, *expectedSample.fh, *fh.Compact(0), fmt.Sprintf("at idx %d", i))
default:
t.Error("internal error, unexpected type")
}
@ -506,29 +507,6 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
}
}
func expandHistogramSamples(chunks []chunks.Meta) (result []chunks.Sample) {
if len(chunks) == 0 {
return
}
for _, chunk := range chunks {
it := chunk.Chunk.Iterator(nil)
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
switch vt {
case chunkenc.ValHistogram:
t, h := it.AtHistogram()
result = append(result, hSample{t: t, h: h})
case chunkenc.ValFloatHistogram:
t, fh := it.AtFloatHistogram()
result = append(result, fhSample{t: t, fh: fh})
default:
panic("unexpected value type")
}
}
}
return
}
func getCounterResetHint(chunk chunks.Meta) chunkenc.CounterResetHeader {
switch chk := chunk.Chunk.(type) {
case *chunkenc.HistogramChunk:

View file

@ -339,11 +339,16 @@ func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, o
return false
}
oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices.
oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span.
oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset
var (
oldSpanSliceIdx, newSpanSliceIdx int = -1, -1 // Index for the span slices. Starts at -1 to indicate that the first non empty span is not yet found.
oldInsideSpanIdx, newInsideSpanIdx uint32 // Index inside a span.
oldIdx, newIdx int32 // Index inside a bucket slice.
oldBucketSliceIdx, newBucketSliceIdx int // Index inside bucket slice.
)
oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice.
// Find first non empty spans.
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
oldVal, newVal := oldBuckets[0].value, newBuckets[0]
// Since we assume that new spans won't have missing buckets, there will never be a case
@ -359,13 +364,12 @@ func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, o
// Moving ahead old bucket and span by 1 index.
if oldInsideSpanIdx+1 >= oldSpans[oldSpanSliceIdx].Length {
// Current span is over.
oldSpanSliceIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldSpans)
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
oldInsideSpanIdx = 0
if oldSpanSliceIdx >= len(oldSpans) {
// All old spans are over.
break
}
oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset
} else {
oldInsideSpanIdx++
oldIdx++
@ -378,14 +382,13 @@ func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, o
// Moving ahead new bucket and span by 1 index.
if newInsideSpanIdx+1 >= newSpans[newSpanSliceIdx].Length {
// Current span is over.
newSpanSliceIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newSpans)
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
newInsideSpanIdx = 0
if newSpanSliceIdx >= len(newSpans) {
// All new spans are over.
// This should not happen, old spans above should catch this first.
panic("new spans over before old spans in counterReset")
}
newIdx += 1 + newSpans[newSpanSliceIdx].Offset
} else {
newInsideSpanIdx++
newIdx++

View file

@ -550,62 +550,193 @@ func assertRecodedFloatHistogramChunkOnAppend(t *testing.T, prevChunk Chunk, hAp
}
func TestFloatHistogramChunkAppendableWithEmptySpan(t *testing.T) {
h1 := &histogram.FloatHistogram{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
tests := map[string]struct {
h1 *histogram.FloatHistogram
h2 *histogram.FloatHistogram
}{
"empty span in old and new histogram": {
h1: &histogram.FloatHistogram{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []float64{1, 2, 1, 1, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2},
},
h2: &histogram.FloatHistogram{
Schema: 0,
Count: 37,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2},
},
},
PositiveBuckets: []float64{1, 2, 1, 1, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
"empty span in old histogram": {
h1: &histogram.FloatHistogram{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 1, Length: 0}, // This span will disappear.
{Offset: 2, Length: 4},
{Offset: 0, Length: 3},
},
PositiveBuckets: []float64{1, 2, 1, 1, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2},
},
h2: &histogram.FloatHistogram{
Schema: 0,
Count: 37,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 4},
{Offset: 0, Length: 3},
},
PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2},
},
},
NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2},
}
h2 := &histogram.FloatHistogram{
Schema: 0,
Count: 37,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
"empty span in new histogram": {
h1: &histogram.FloatHistogram{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 3, Length: 3},
},
PositiveBuckets: []float64{1, 2, 1, 1, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2},
},
h2: &histogram.FloatHistogram{
Schema: 0,
Count: 37,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 0}, // This span is new.
{Offset: 2, Length: 3},
},
PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2},
},
},
PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
"two empty spans mixing offsets": {
h1: &histogram.FloatHistogram{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 0},
{Offset: 3, Length: 0},
{Offset: 4, Length: 3},
},
PositiveBuckets: []float64{1, 2, 1, 1, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2},
},
h2: &histogram.FloatHistogram{
Schema: 0,
Count: 37,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 3, Length: 0},
{Offset: 1, Length: 0},
{Offset: 4, Length: 3},
},
PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2},
},
},
NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2},
}
c := Chunk(NewFloatHistogramChunk())
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
c := Chunk(NewFloatHistogramChunk())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
_, _, _, err = app.AppendFloatHistogram(nil, 1, h1, true)
require.NoError(t, err)
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*FloatHistogramAppender)
_, _, _, err = app.AppendFloatHistogram(nil, 1, tc.h1, true)
require.NoError(t, err)
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, okToAppend, counterReset := hApp.appendable(h2)
require.Empty(t, pI)
require.Empty(t, nI)
require.True(t, okToAppend)
require.False(t, counterReset)
pI, nI, okToAppend, counterReset := hApp.appendable(tc.h2)
require.Empty(t, pI)
require.Empty(t, nI)
require.True(t, okToAppend)
require.False(t, counterReset)
})
}
}
func TestFloatHistogramChunkAppendableGauge(t *testing.T) {

View file

@ -359,11 +359,16 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans
return false
}
oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices.
oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span.
oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset
var (
oldSpanSliceIdx, newSpanSliceIdx int = -1, -1 // Index for the span slices. Starts at -1 to indicate that the first non empty span is not yet found.
oldInsideSpanIdx, newInsideSpanIdx uint32 // Index inside a span.
oldIdx, newIdx int32 // Index inside a bucket slice.
oldBucketSliceIdx, newBucketSliceIdx int // Index inside bucket slice.
)
oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice.
// Find first non empty spans.
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
oldVal, newVal := oldBuckets[0], newBuckets[0]
// Since we assume that new spans won't have missing buckets, there will never be a case
@ -379,13 +384,12 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans
// Moving ahead old bucket and span by 1 index.
if oldInsideSpanIdx+1 >= oldSpans[oldSpanSliceIdx].Length {
// Current span is over.
oldSpanSliceIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldSpans)
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
oldInsideSpanIdx = 0
if oldSpanSliceIdx >= len(oldSpans) {
// All old spans are over.
break
}
oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset
} else {
oldInsideSpanIdx++
oldIdx++
@ -398,14 +402,13 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans
// Moving ahead new bucket and span by 1 index.
if newInsideSpanIdx+1 >= newSpans[newSpanSliceIdx].Length {
// Current span is over.
newSpanSliceIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newSpans)
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
newInsideSpanIdx = 0
if newSpanSliceIdx >= len(newSpans) {
// All new spans are over.
// This should not happen, old spans above should catch this first.
panic("new spans over before old spans in counterReset")
}
newIdx += 1 + newSpans[newSpanSliceIdx].Offset
} else {
newInsideSpanIdx++
newIdx++

View file

@ -489,8 +489,13 @@ func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterR
}
// Handle pathological case of empty span when advancing span idx.
func nextNonEmptySpanSliceIdx(idx int, spans []histogram.Span) (newIdx int) {
for idx++; idx < len(spans) && spans[idx].Length == 0; idx++ { //nolint:revive // This "empty" block is intentional
// Call it with idx==-1 to find the first non empty span.
func nextNonEmptySpanSliceIdx(idx int, bucketIdx int32, spans []histogram.Span) (newIdx int, newBucketIdx int32) {
for idx++; idx < len(spans); idx++ {
if spans[idx].Length > 0 {
return idx, bucketIdx + spans[idx].Offset + 1
}
bucketIdx += spans[idx].Offset
}
return idx
return idx, 0
}

View file

@ -573,62 +573,193 @@ func assertSampleCount(t *testing.T, c Chunk, exp int64, vtype ValueType) {
}
func TestHistogramChunkAppendableWithEmptySpan(t *testing.T) {
h1 := &histogram.Histogram{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
tests := map[string]struct {
h1 *histogram.Histogram
h2 *histogram.Histogram
}{
"empty span in old and new histogram": {
h1: &histogram.Histogram{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{1, 1, -1, 0, 0, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 1, -1, 1, 0, 0, 0},
},
h2: &histogram.Histogram{
Schema: 0,
Count: 37,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
},
},
PositiveBuckets: []int64{1, 1, -1, 0, 0, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
"empty span in old histogram": {
h1: &histogram.Histogram{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 1, Length: 0}, // This span will disappear.
{Offset: 2, Length: 4},
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{1, 1, -1, 0, 0, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 1, -1, 1, 0, 0, 0},
},
h2: &histogram.Histogram{
Schema: 0,
Count: 37,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 4},
{Offset: 0, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
},
},
NegativeBuckets: []int64{1, 1, -1, 1, 0, 0, 0},
}
h2 := &histogram.Histogram{
Schema: 0,
Count: 37,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
{Offset: 0, Length: 3},
"empty span in new histogram": {
h1: &histogram.Histogram{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 3, Length: 3},
},
PositiveBuckets: []int64{1, 1, -1, 0, 0, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 1, -1, 1, 0, 0, 0},
},
h2: &histogram.Histogram{
Schema: 0,
Count: 37,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 0}, // This span is new.
{Offset: 2, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
"two empty spans mixing offsets": {
h1: &histogram.Histogram{
Schema: 0,
Count: 21,
Sum: 1234.5,
ZeroThreshold: 0.001,
ZeroCount: 4,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 0},
{Offset: 3, Length: 0},
{Offset: 4, Length: 3},
},
PositiveBuckets: []int64{1, 1, -1, 0, 0, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 1, -1, 1, 0, 0, 0},
},
h2: &histogram.Histogram{
Schema: 0,
Count: 37,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 3, Length: 0},
{Offset: 1, Length: 0},
{Offset: 4, Length: 3},
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 4},
{Offset: 2, Length: 0},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
},
},
NegativeBuckets: []int64{1, 3, -2, 5, -2, 0, -3},
}
c := Chunk(NewHistogramChunk())
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
c := Chunk(NewHistogramChunk())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
_, _, _, err = app.AppendHistogram(nil, 1, h1, true)
require.NoError(t, err)
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*HistogramAppender)
_, _, _, err = app.AppendHistogram(nil, 1, tc.h1, true)
require.NoError(t, err)
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*HistogramAppender)
pI, nI, okToAppend, counterReset := hApp.appendable(h2)
require.Empty(t, pI)
require.Empty(t, nI)
require.True(t, okToAppend)
require.False(t, counterReset)
pI, nI, okToAppend, counterReset := hApp.appendable(tc.h2)
require.Empty(t, pI)
require.Empty(t, nI)
require.True(t, okToAppend)
require.False(t, counterReset)
})
}
}
func TestAtFloatHistogram(t *testing.T) {

View file

@ -207,6 +207,34 @@ func PopulatedChunk(numSamples int, minTime int64) (Meta, error) {
return ChunkFromSamples(samples)
}
// ChunkMetasToSamples converts a slice of chunk meta data to a slice of samples.
// Used in tests to compare the content of chunks.
func ChunkMetasToSamples(chunks []Meta) (result []Sample) {
if len(chunks) == 0 {
return
}
for _, chunk := range chunks {
it := chunk.Chunk.Iterator(nil)
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
switch vt {
case chunkenc.ValFloat:
t, v := it.At()
result = append(result, sample{t: t, f: v})
case chunkenc.ValHistogram:
t, h := it.AtHistogram()
result = append(result, sample{t: t, h: h})
case chunkenc.ValFloatHistogram:
t, fh := it.AtFloatHistogram()
result = append(result, sample{t: t, fh: fh})
default:
panic("unexpected value type")
}
}
}
return
}
// Iterator iterates over the chunks of a single time series.
type Iterator interface {
// At returns the current meta.