mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
storage: make histogram reset handling consistent in chainSampleIterator (#12779)
storage: make histogram reset handling consistent in chainSampleIterator --------- Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
69edd8709b
commit
3512b2d678
|
@ -523,8 +523,12 @@ func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) {
|
||||||
}
|
}
|
||||||
t, h := c.curr.AtHistogram()
|
t, h := c.curr.AtHistogram()
|
||||||
// If the current sample is not consecutive with the previous one, we
|
// If the current sample is not consecutive with the previous one, we
|
||||||
// cannot be sure anymore that there was no counter reset.
|
// cannot be sure anymore about counter resets for counter histograms.
|
||||||
if !c.consecutive && h.CounterResetHint == histogram.NotCounterReset {
|
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
|
||||||
|
// non-consecutive `CounterReset` sample, we could keep the hint as
|
||||||
|
// `CounterReset`. But then we needed to track the previous sample
|
||||||
|
// in more detail, which might not be worth it.
|
||||||
|
if !c.consecutive && h.CounterResetHint != histogram.GaugeType {
|
||||||
h.CounterResetHint = histogram.UnknownCounterReset
|
h.CounterResetHint = histogram.UnknownCounterReset
|
||||||
}
|
}
|
||||||
return t, h
|
return t, h
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
|
@ -384,16 +385,36 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func histogramSample(ts int64, hint histogram.CounterResetHint) hSample {
|
||||||
|
h := tsdbutil.GenerateTestHistogram(int(ts + 1))
|
||||||
|
h.CounterResetHint = hint
|
||||||
|
return hSample{t: ts, h: h}
|
||||||
|
}
|
||||||
|
|
||||||
|
func floatHistogramSample(ts int64, hint histogram.CounterResetHint) fhSample {
|
||||||
|
fh := tsdbutil.GenerateTestFloatHistogram(int(ts + 1))
|
||||||
|
fh.CounterResetHint = hint
|
||||||
|
return fhSample{t: ts, fh: fh}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shorthands for counter reset hints.
|
||||||
|
const (
|
||||||
|
uk = histogram.UnknownCounterReset
|
||||||
|
cr = histogram.CounterReset
|
||||||
|
nr = histogram.NotCounterReset
|
||||||
|
ga = histogram.GaugeType
|
||||||
|
)
|
||||||
|
|
||||||
func TestCompactingChunkSeriesMerger(t *testing.T) {
|
func TestCompactingChunkSeriesMerger(t *testing.T) {
|
||||||
m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge)
|
m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge)
|
||||||
|
|
||||||
// histogramSample returns a histogram that is unique to the ts.
|
// histogramSample returns a histogram that is unique to the ts.
|
||||||
histogramSample := func(ts int64) hSample {
|
histogramSample := func(ts int64) hSample {
|
||||||
return hSample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts + 1))}
|
return histogramSample(ts, uk)
|
||||||
}
|
}
|
||||||
|
|
||||||
floatHistogramSample := func(ts int64) fhSample {
|
floatHistogramSample := func(ts int64) fhSample {
|
||||||
return fhSample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts + 1))}
|
return floatHistogramSample(ts, uk)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range []struct {
|
for _, tc := range []struct {
|
||||||
|
@ -573,10 +594,131 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, expErr, actErr)
|
require.Equal(t, expErr, actErr)
|
||||||
require.Equal(t, expChks, actChks)
|
require.Equal(t, expChks, actChks)
|
||||||
|
|
||||||
|
actSamples := chunks.ChunkMetasToSamples(actChks)
|
||||||
|
expSamples := chunks.ChunkMetasToSamples(expChks)
|
||||||
|
require.Equal(t, expSamples, actSamples)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCompactingChunkSeriesMergerHistogramCounterResetHint(t *testing.T) {
|
||||||
|
m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge)
|
||||||
|
|
||||||
|
for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{
|
||||||
|
"histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) },
|
||||||
|
"float histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return floatHistogramSample(ts, hint) },
|
||||||
|
} {
|
||||||
|
for name, tc := range map[string]struct {
|
||||||
|
input []ChunkSeries
|
||||||
|
expected ChunkSeries
|
||||||
|
}{
|
||||||
|
"histogram counter reset hint kept in single series": {
|
||||||
|
input: []ChunkSeries{
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
"histogram not counter reset hint kept in single series": {
|
||||||
|
input: []ChunkSeries{
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
"histogram counter reset hint kept in multiple equal series": {
|
||||||
|
input: []ChunkSeries{
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
"histogram not counter reset hint kept in multiple equal series": {
|
||||||
|
input: []ChunkSeries{
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
"histogram counter reset hint dropped from differing series": {
|
||||||
|
input: []ChunkSeries{
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, cr), sampleFunc(12, uk), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, cr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, uk), sampleFunc(12, uk), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
"histogram counter not reset hint dropped from differing series": {
|
||||||
|
input: []ChunkSeries{
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, nr), sampleFunc(12, uk), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]chunks.Sample{sampleFunc(0, nr), sampleFunc(5, uk)},
|
||||||
|
[]chunks.Sample{sampleFunc(10, uk), sampleFunc(12, uk), sampleFunc(15, uk)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(sampleType+"/"+name, func(t *testing.T) {
|
||||||
|
merged := m(tc.input...)
|
||||||
|
require.Equal(t, tc.expected.Labels(), merged.Labels())
|
||||||
|
actChks, actErr := ExpandChunks(merged.Iterator(nil))
|
||||||
|
expChks, expErr := ExpandChunks(tc.expected.Iterator(nil))
|
||||||
|
|
||||||
|
require.Equal(t, expErr, actErr)
|
||||||
|
require.Equal(t, expChks, actChks)
|
||||||
|
|
||||||
|
actSamples := chunks.ChunkMetasToSamples(actChks)
|
||||||
|
expSamples := chunks.ChunkMetasToSamples(expChks)
|
||||||
|
require.Equal(t, expSamples, actSamples)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestConcatenatingChunkSeriesMerger(t *testing.T) {
|
func TestConcatenatingChunkSeriesMerger(t *testing.T) {
|
||||||
m := NewConcatenatingChunkSeriesMerger()
|
m := NewConcatenatingChunkSeriesMerger()
|
||||||
|
|
||||||
|
@ -804,102 +946,243 @@ func (m *mockChunkSeriesSet) Err() error { return nil }
|
||||||
func (m *mockChunkSeriesSet) Warnings() annotations.Annotations { return nil }
|
func (m *mockChunkSeriesSet) Warnings() annotations.Annotations { return nil }
|
||||||
|
|
||||||
func TestChainSampleIterator(t *testing.T) {
|
func TestChainSampleIterator(t *testing.T) {
|
||||||
for _, tc := range []struct {
|
for sampleType, sampleFunc := range map[string]func(int64) chunks.Sample{
|
||||||
input []chunkenc.Iterator
|
"float": func(ts int64) chunks.Sample { return fSample{ts, float64(ts)} },
|
||||||
expected []chunks.Sample
|
"histogram": func(ts int64) chunks.Sample { return histogramSample(ts, uk) },
|
||||||
}{
|
"float histogram": func(ts int64) chunks.Sample { return floatHistogramSample(ts, uk) },
|
||||||
{
|
|
||||||
input: []chunkenc.Iterator{
|
|
||||||
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}),
|
|
||||||
},
|
|
||||||
expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: []chunkenc.Iterator{
|
|
||||||
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}),
|
|
||||||
NewListSeriesIterator(samples{fSample{2, 2}, fSample{3, 3}}),
|
|
||||||
},
|
|
||||||
expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: []chunkenc.Iterator{
|
|
||||||
NewListSeriesIterator(samples{fSample{0, 0}, fSample{3, 3}}),
|
|
||||||
NewListSeriesIterator(samples{fSample{1, 1}, fSample{4, 4}}),
|
|
||||||
NewListSeriesIterator(samples{fSample{2, 2}, fSample{5, 5}}),
|
|
||||||
},
|
|
||||||
expected: []chunks.Sample{
|
|
||||||
fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}, fSample{4, 4}, fSample{5, 5},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Overlap.
|
|
||||||
{
|
|
||||||
input: []chunkenc.Iterator{
|
|
||||||
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}),
|
|
||||||
NewListSeriesIterator(samples{fSample{0, 0}, fSample{2, 2}}),
|
|
||||||
NewListSeriesIterator(samples{fSample{2, 2}, fSample{3, 3}}),
|
|
||||||
NewListSeriesIterator(samples{}),
|
|
||||||
NewListSeriesIterator(samples{}),
|
|
||||||
NewListSeriesIterator(samples{}),
|
|
||||||
},
|
|
||||||
expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}},
|
|
||||||
},
|
|
||||||
} {
|
} {
|
||||||
merged := ChainSampleIteratorFromIterators(nil, tc.input)
|
for name, tc := range map[string]struct {
|
||||||
actual, err := ExpandSamples(merged, nil)
|
input []chunkenc.Iterator
|
||||||
require.NoError(t, err)
|
expected []chunks.Sample
|
||||||
require.Equal(t, tc.expected, actual)
|
}{
|
||||||
|
"single iterator": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}),
|
||||||
|
},
|
||||||
|
expected: []chunks.Sample{sampleFunc(0), sampleFunc(1)},
|
||||||
|
},
|
||||||
|
"non overlapping iterators": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(3)}),
|
||||||
|
},
|
||||||
|
expected: []chunks.Sample{sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3)},
|
||||||
|
},
|
||||||
|
"overlapping but distinct iterators": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(3)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(1), sampleFunc(4)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(5)}),
|
||||||
|
},
|
||||||
|
expected: []chunks.Sample{
|
||||||
|
sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3), sampleFunc(4), sampleFunc(5),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"overlapping iterators": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(2)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(3)}),
|
||||||
|
NewListSeriesIterator(samples{}),
|
||||||
|
NewListSeriesIterator(samples{}),
|
||||||
|
NewListSeriesIterator(samples{}),
|
||||||
|
},
|
||||||
|
expected: []chunks.Sample{sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3)},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(sampleType+"/"+name, func(t *testing.T) {
|
||||||
|
merged := ChainSampleIteratorFromIterators(nil, tc.input)
|
||||||
|
actual, err := ExpandSamples(merged, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tc.expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChainSampleIteratorHistogramCounterResetHint(t *testing.T) {
|
||||||
|
for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{
|
||||||
|
"histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) },
|
||||||
|
"float histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return floatHistogramSample(ts, hint) },
|
||||||
|
} {
|
||||||
|
for name, tc := range map[string]struct {
|
||||||
|
input []chunkenc.Iterator
|
||||||
|
expected []chunks.Sample
|
||||||
|
}{
|
||||||
|
"single iterator": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, uk)}),
|
||||||
|
},
|
||||||
|
expected: []chunks.Sample{sampleFunc(0, uk), sampleFunc(1, cr), sampleFunc(2, uk)},
|
||||||
|
},
|
||||||
|
"single iterator gauge": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga)}),
|
||||||
|
},
|
||||||
|
expected: []chunks.Sample{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga)},
|
||||||
|
},
|
||||||
|
"overlapping iterators gauge": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga), sampleFunc(4, ga)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(3, ga), sampleFunc(5, ga)}),
|
||||||
|
},
|
||||||
|
expected: []chunks.Sample{sampleFunc(0, ga), sampleFunc(1, ga), sampleFunc(2, ga), sampleFunc(3, ga), sampleFunc(4, ga), sampleFunc(5, ga)},
|
||||||
|
},
|
||||||
|
"non overlapping iterators": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, uk)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(2, cr), sampleFunc(3, cr)}),
|
||||||
|
},
|
||||||
|
expected: []chunks.Sample{sampleFunc(0, uk), sampleFunc(1, uk), sampleFunc(2, uk), sampleFunc(3, cr)},
|
||||||
|
},
|
||||||
|
"overlapping but distinct iterators": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(3, uk), sampleFunc(5, cr)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(1, uk), sampleFunc(2, cr), sampleFunc(4, cr)}),
|
||||||
|
},
|
||||||
|
expected: []chunks.Sample{
|
||||||
|
sampleFunc(0, uk), sampleFunc(1, uk), sampleFunc(2, cr), sampleFunc(3, uk), sampleFunc(4, uk), sampleFunc(5, uk),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"overlapping iterators": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, cr)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, cr)}),
|
||||||
|
},
|
||||||
|
expected: []chunks.Sample{sampleFunc(0, uk), sampleFunc(1, uk), sampleFunc(2, uk)},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(sampleType+"/"+name, func(t *testing.T) {
|
||||||
|
merged := ChainSampleIteratorFromIterators(nil, tc.input)
|
||||||
|
actual, err := ExpandSamples(merged, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tc.expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestChainSampleIteratorSeek(t *testing.T) {
|
func TestChainSampleIteratorSeek(t *testing.T) {
|
||||||
for _, tc := range []struct {
|
for sampleType, sampleFunc := range map[string]func(int64) chunks.Sample{
|
||||||
input []chunkenc.Iterator
|
"float": func(ts int64) chunks.Sample { return fSample{ts, float64(ts)} },
|
||||||
seek int64
|
"histogram": func(ts int64) chunks.Sample { return histogramSample(ts, uk) },
|
||||||
expected []chunks.Sample
|
"float histogram": func(ts int64) chunks.Sample { return floatHistogramSample(ts, uk) },
|
||||||
}{
|
|
||||||
{
|
|
||||||
input: []chunkenc.Iterator{
|
|
||||||
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}}),
|
|
||||||
},
|
|
||||||
seek: 1,
|
|
||||||
expected: []chunks.Sample{fSample{1, 1}, fSample{2, 2}},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: []chunkenc.Iterator{
|
|
||||||
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}}),
|
|
||||||
NewListSeriesIterator(samples{fSample{2, 2}, fSample{3, 3}}),
|
|
||||||
},
|
|
||||||
seek: 2,
|
|
||||||
expected: []chunks.Sample{fSample{2, 2}, fSample{3, 3}},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: []chunkenc.Iterator{
|
|
||||||
NewListSeriesIterator(samples{fSample{0, 0}, fSample{3, 3}}),
|
|
||||||
NewListSeriesIterator(samples{fSample{1, 1}, fSample{4, 4}}),
|
|
||||||
NewListSeriesIterator(samples{fSample{2, 2}, fSample{5, 5}}),
|
|
||||||
},
|
|
||||||
seek: 2,
|
|
||||||
expected: []chunks.Sample{fSample{2, 2}, fSample{3, 3}, fSample{4, 4}, fSample{5, 5}},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
input: []chunkenc.Iterator{
|
|
||||||
NewListSeriesIterator(samples{fSample{0, 0}, fSample{2, 2}, fSample{3, 3}}),
|
|
||||||
NewListSeriesIterator(samples{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}}),
|
|
||||||
},
|
|
||||||
seek: 0,
|
|
||||||
expected: []chunks.Sample{fSample{0, 0}, fSample{1, 1}, fSample{2, 2}, fSample{3, 3}},
|
|
||||||
},
|
|
||||||
} {
|
} {
|
||||||
merged := ChainSampleIteratorFromIterators(nil, tc.input)
|
for name, tc := range map[string]struct {
|
||||||
actual := []chunks.Sample{}
|
input []chunkenc.Iterator
|
||||||
if merged.Seek(tc.seek) == chunkenc.ValFloat {
|
seek int64
|
||||||
t, f := merged.At()
|
expected []chunks.Sample
|
||||||
actual = append(actual, fSample{t, f})
|
}{
|
||||||
|
"single iterator": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1), sampleFunc(2)}),
|
||||||
|
},
|
||||||
|
seek: 1,
|
||||||
|
expected: []chunks.Sample{sampleFunc(1), sampleFunc(2)},
|
||||||
|
},
|
||||||
|
"non overlapping iterators": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(3)}),
|
||||||
|
},
|
||||||
|
seek: 2,
|
||||||
|
expected: []chunks.Sample{sampleFunc(2), sampleFunc(3)},
|
||||||
|
},
|
||||||
|
"overlapping but distinct iterators": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(3)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(1), sampleFunc(4)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(2), sampleFunc(5)}),
|
||||||
|
},
|
||||||
|
seek: 2,
|
||||||
|
expected: []chunks.Sample{sampleFunc(2), sampleFunc(3), sampleFunc(4), sampleFunc(5)},
|
||||||
|
},
|
||||||
|
"overlapping iterators": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(2), sampleFunc(3)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0), sampleFunc(1), sampleFunc(2)}),
|
||||||
|
},
|
||||||
|
seek: 0,
|
||||||
|
expected: []chunks.Sample{sampleFunc(0), sampleFunc(1), sampleFunc(2), sampleFunc(3)},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(sampleType+"/"+name, func(t *testing.T) {
|
||||||
|
merged := ChainSampleIteratorFromIterators(nil, tc.input)
|
||||||
|
actual := []chunks.Sample{}
|
||||||
|
switch merged.Seek(tc.seek) {
|
||||||
|
case chunkenc.ValFloat:
|
||||||
|
t, f := merged.At()
|
||||||
|
actual = append(actual, fSample{t, f})
|
||||||
|
case chunkenc.ValHistogram:
|
||||||
|
t, h := merged.AtHistogram()
|
||||||
|
actual = append(actual, hSample{t, h})
|
||||||
|
case chunkenc.ValFloatHistogram:
|
||||||
|
t, fh := merged.AtFloatHistogram()
|
||||||
|
actual = append(actual, fhSample{t, fh})
|
||||||
|
}
|
||||||
|
s, err := ExpandSamples(merged, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
actual = append(actual, s...)
|
||||||
|
require.Equal(t, tc.expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) {
|
||||||
|
for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{
|
||||||
|
"histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) },
|
||||||
|
"float histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return floatHistogramSample(ts, hint) },
|
||||||
|
} {
|
||||||
|
for name, tc := range map[string]struct {
|
||||||
|
input []chunkenc.Iterator
|
||||||
|
seek int64
|
||||||
|
expected []chunks.Sample
|
||||||
|
}{
|
||||||
|
"single iterator": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, cr), sampleFunc(2, uk)}),
|
||||||
|
},
|
||||||
|
seek: 1,
|
||||||
|
expected: []chunks.Sample{sampleFunc(1, uk), sampleFunc(2, uk)},
|
||||||
|
},
|
||||||
|
"non overlapping iterators": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, uk)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(2, cr), sampleFunc(3, cr)}),
|
||||||
|
},
|
||||||
|
seek: 2,
|
||||||
|
expected: []chunks.Sample{sampleFunc(2, uk), sampleFunc(3, cr)},
|
||||||
|
},
|
||||||
|
"non overlapping iterators seek to internal reset": {
|
||||||
|
input: []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(0, cr), sampleFunc(1, uk)}),
|
||||||
|
NewListSeriesIterator(samples{sampleFunc(2, cr), sampleFunc(3, cr)}),
|
||||||
|
},
|
||||||
|
seek: 3,
|
||||||
|
expected: []chunks.Sample{sampleFunc(3, uk)},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(sampleType+"/"+name, func(t *testing.T) {
|
||||||
|
merged := ChainSampleIteratorFromIterators(nil, tc.input)
|
||||||
|
actual := []chunks.Sample{}
|
||||||
|
switch merged.Seek(tc.seek) {
|
||||||
|
case chunkenc.ValFloat:
|
||||||
|
t, f := merged.At()
|
||||||
|
actual = append(actual, fSample{t, f})
|
||||||
|
case chunkenc.ValHistogram:
|
||||||
|
t, h := merged.AtHistogram()
|
||||||
|
actual = append(actual, hSample{t, h})
|
||||||
|
case chunkenc.ValFloatHistogram:
|
||||||
|
t, fh := merged.AtFloatHistogram()
|
||||||
|
actual = append(actual, fhSample{t, fh})
|
||||||
|
}
|
||||||
|
s, err := ExpandSamples(merged, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
actual = append(actual, s...)
|
||||||
|
require.Equal(t, tc.expected, actual)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
s, err := ExpandSamples(merged, nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
actual = append(actual, s...)
|
|
||||||
require.Equal(t, tc.expected, actual)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -430,35 +430,36 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
|
||||||
require.Equal(t, len(test.expectedCounterResetHeaders), len(chks))
|
require.Equal(t, len(test.expectedCounterResetHeaders), len(chks))
|
||||||
|
|
||||||
// Decode all encoded samples and assert they are equal to the original ones.
|
// Decode all encoded samples and assert they are equal to the original ones.
|
||||||
encodedSamples := expandHistogramSamples(chks)
|
encodedSamples := chunks.ChunkMetasToSamples(chks)
|
||||||
require.Equal(t, len(test.samples), len(encodedSamples))
|
require.Equal(t, len(test.samples), len(encodedSamples))
|
||||||
|
|
||||||
for i, s := range test.samples {
|
for i, s := range test.samples {
|
||||||
|
encodedSample := encodedSamples[i]
|
||||||
switch expectedSample := s.(type) {
|
switch expectedSample := s.(type) {
|
||||||
case hSample:
|
case hSample:
|
||||||
encodedSample, ok := encodedSamples[i].(hSample)
|
require.Equal(t, chunkenc.ValHistogram, encodedSample.Type(), "expect histogram", fmt.Sprintf("at idx %d", i))
|
||||||
require.True(t, ok, "expect histogram", fmt.Sprintf("at idx %d", i))
|
h := encodedSample.H()
|
||||||
// Ignore counter reset if not gauge here, will check on chunk level.
|
// Ignore counter reset if not gauge here, will check on chunk level.
|
||||||
if expectedSample.h.CounterResetHint != histogram.GaugeType {
|
if expectedSample.h.CounterResetHint != histogram.GaugeType {
|
||||||
encodedSample.h.CounterResetHint = histogram.UnknownCounterReset
|
h.CounterResetHint = histogram.UnknownCounterReset
|
||||||
}
|
}
|
||||||
if value.IsStaleNaN(expectedSample.h.Sum) {
|
if value.IsStaleNaN(expectedSample.h.Sum) {
|
||||||
require.True(t, value.IsStaleNaN(encodedSample.h.Sum), fmt.Sprintf("at idx %d", i))
|
require.True(t, value.IsStaleNaN(h.Sum), fmt.Sprintf("at idx %d", i))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
require.Equal(t, *expectedSample.h, *encodedSample.h.Compact(0), fmt.Sprintf("at idx %d", i))
|
require.Equal(t, *expectedSample.h, *h.Compact(0), fmt.Sprintf("at idx %d", i))
|
||||||
case fhSample:
|
case fhSample:
|
||||||
encodedSample, ok := encodedSamples[i].(fhSample)
|
require.Equal(t, chunkenc.ValFloatHistogram, encodedSample.Type(), "expect float histogram", fmt.Sprintf("at idx %d", i))
|
||||||
require.True(t, ok, "expect float histogram", fmt.Sprintf("at idx %d", i))
|
fh := encodedSample.FH()
|
||||||
// Ignore counter reset if not gauge here, will check on chunk level.
|
// Ignore counter reset if not gauge here, will check on chunk level.
|
||||||
if expectedSample.fh.CounterResetHint != histogram.GaugeType {
|
if expectedSample.fh.CounterResetHint != histogram.GaugeType {
|
||||||
encodedSample.fh.CounterResetHint = histogram.UnknownCounterReset
|
fh.CounterResetHint = histogram.UnknownCounterReset
|
||||||
}
|
}
|
||||||
if value.IsStaleNaN(expectedSample.fh.Sum) {
|
if value.IsStaleNaN(expectedSample.fh.Sum) {
|
||||||
require.True(t, value.IsStaleNaN(encodedSample.fh.Sum), fmt.Sprintf("at idx %d", i))
|
require.True(t, value.IsStaleNaN(fh.Sum), fmt.Sprintf("at idx %d", i))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
require.Equal(t, *expectedSample.fh, *encodedSample.fh.Compact(0), fmt.Sprintf("at idx %d", i))
|
require.Equal(t, *expectedSample.fh, *fh.Compact(0), fmt.Sprintf("at idx %d", i))
|
||||||
default:
|
default:
|
||||||
t.Error("internal error, unexpected type")
|
t.Error("internal error, unexpected type")
|
||||||
}
|
}
|
||||||
|
@ -469,29 +470,6 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func expandHistogramSamples(chunks []chunks.Meta) (result []chunks.Sample) {
|
|
||||||
if len(chunks) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, chunk := range chunks {
|
|
||||||
it := chunk.Chunk.Iterator(nil)
|
|
||||||
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
|
|
||||||
switch vt {
|
|
||||||
case chunkenc.ValHistogram:
|
|
||||||
t, h := it.AtHistogram()
|
|
||||||
result = append(result, hSample{t: t, h: h})
|
|
||||||
case chunkenc.ValFloatHistogram:
|
|
||||||
t, fh := it.AtFloatHistogram()
|
|
||||||
result = append(result, fhSample{t: t, fh: fh})
|
|
||||||
default:
|
|
||||||
panic("unexpected value type")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func getCounterResetHint(chunk chunks.Meta) chunkenc.CounterResetHeader {
|
func getCounterResetHint(chunk chunks.Meta) chunkenc.CounterResetHeader {
|
||||||
switch chk := chunk.Chunk.(type) {
|
switch chk := chunk.Chunk.(type) {
|
||||||
case *chunkenc.HistogramChunk:
|
case *chunkenc.HistogramChunk:
|
||||||
|
|
|
@ -207,6 +207,34 @@ func PopulatedChunk(numSamples int, minTime int64) (Meta, error) {
|
||||||
return ChunkFromSamples(samples)
|
return ChunkFromSamples(samples)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ChunkMetasToSamples converts a slice of chunk meta data to a slice of samples.
|
||||||
|
// Used in tests to compare the content of chunks.
|
||||||
|
func ChunkMetasToSamples(chunks []Meta) (result []Sample) {
|
||||||
|
if len(chunks) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
it := chunk.Chunk.Iterator(nil)
|
||||||
|
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
|
||||||
|
switch vt {
|
||||||
|
case chunkenc.ValFloat:
|
||||||
|
t, v := it.At()
|
||||||
|
result = append(result, sample{t: t, f: v})
|
||||||
|
case chunkenc.ValHistogram:
|
||||||
|
t, h := it.AtHistogram()
|
||||||
|
result = append(result, sample{t: t, h: h})
|
||||||
|
case chunkenc.ValFloatHistogram:
|
||||||
|
t, fh := it.AtFloatHistogram()
|
||||||
|
result = append(result, sample{t: t, fh: fh})
|
||||||
|
default:
|
||||||
|
panic("unexpected value type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Iterator iterates over the chunks of a single time series.
|
// Iterator iterates over the chunks of a single time series.
|
||||||
type Iterator interface {
|
type Iterator interface {
|
||||||
// At returns the current meta.
|
// At returns the current meta.
|
||||||
|
|
Loading…
Reference in a new issue