mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Native histograms: fix spurios counter reset when merging recoded chunk to normal chunk (#14513)
* chunkenc: allow missing empty buckets on histogram append Allow appending to chunks when the histogram to be added is missing some buckets, but the missing buckets are empty in the chunk. For example bucket at index 5 is present in the chunk, but its value is 0 and the new histogram doesn't have a bucket at index 5. This fixes an issue of merging chunks where one chunk was recoded to retroactively have some empty buckets in all the histograms and we are merging in a histogram that doesn't have the empty bucket (because it was not recoded yet). The operation alters the histogram that is being added, however this has already been the case when appending gauge histograms. Thus the test TestHistogramSeriesToChunks in storage package is changed to explicitly test what happened to the appended histogram - Compact(0) call is removed. The new expandIntSpansAndBuckets and expandFloatSpansAndBuckets functions are a merge of expandSpansForward and counterResetInAnyBucket and counterResetInAnyFloatBucket. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
29b62762db
commit
00ab05c3b9
|
@ -126,6 +126,7 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) {
|
|||
|
||||
type histogramTest struct {
|
||||
samples []chunks.Sample
|
||||
expectedSamples []chunks.Sample
|
||||
expectedCounterResetHeaders []chunkenc.CounterResetHeader
|
||||
}
|
||||
|
||||
|
@ -141,6 +142,32 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
},
|
||||
PositiveBuckets: []int64{2, 1}, // Abs: 2, 3
|
||||
}
|
||||
// h1 but with an extra empty bucket at offset -10.
|
||||
// This can happen if h1 is from a recoded chunk, where a later histogram had a bucket at offset -10.
|
||||
h1ExtraBuckets := &histogram.Histogram{
|
||||
Count: 7,
|
||||
ZeroCount: 2,
|
||||
ZeroThreshold: 0.001,
|
||||
Sum: 100,
|
||||
Schema: 0,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: -10, Length: 1},
|
||||
{Offset: 9, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []int64{0, 2, 1}, // Abs: 0, 2, 3
|
||||
}
|
||||
h1Recoded := &histogram.Histogram{
|
||||
Count: 7,
|
||||
ZeroCount: 2,
|
||||
ZeroThreshold: 0.001,
|
||||
Sum: 100,
|
||||
Schema: 0,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []int64{2, 1, -3, 0}, // Abs: 2, 3, 0, 0
|
||||
}
|
||||
// Appendable to h1.
|
||||
h2 := &histogram.Histogram{
|
||||
Count: 12,
|
||||
|
@ -179,6 +206,32 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
},
|
||||
PositiveBuckets: []float64{3, 1},
|
||||
}
|
||||
// fh1 but with an extra empty bucket at offset -10.
|
||||
// This can happen if fh1 is from a recoded chunk, where a later histogram had a bucket at offset -10.
|
||||
fh1ExtraBuckets := &histogram.FloatHistogram{
|
||||
Count: 6,
|
||||
ZeroCount: 2,
|
||||
ZeroThreshold: 0.001,
|
||||
Sum: 100,
|
||||
Schema: 0,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: -10, Length: 1},
|
||||
{Offset: 9, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []float64{0, 3, 1},
|
||||
}
|
||||
fh1Recoded := &histogram.FloatHistogram{
|
||||
Count: 6,
|
||||
ZeroCount: 2,
|
||||
ZeroThreshold: 0.001,
|
||||
Sum: 100,
|
||||
Schema: 0,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []float64{3, 1, 0, 0},
|
||||
}
|
||||
// Appendable to fh1.
|
||||
fh2 := &histogram.FloatHistogram{
|
||||
Count: 17,
|
||||
|
@ -219,6 +272,20 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
},
|
||||
PositiveBuckets: []int64{2, 1}, // Abs: 2, 3
|
||||
}
|
||||
// gh1 recoded to add extra empty buckets at end.
|
||||
gh1Recoded := &histogram.Histogram{
|
||||
CounterResetHint: histogram.GaugeType,
|
||||
Count: 7,
|
||||
ZeroCount: 2,
|
||||
ZeroThreshold: 0.001,
|
||||
Sum: 100,
|
||||
Schema: 0,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []int64{2, 1, -3, 0}, // Abs: 2, 3, 0, 0
|
||||
}
|
||||
gh2 := &histogram.Histogram{
|
||||
CounterResetHint: histogram.GaugeType,
|
||||
Count: 12,
|
||||
|
@ -246,6 +313,20 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
},
|
||||
PositiveBuckets: []float64{3, 1},
|
||||
}
|
||||
// gfh1 recoded to add an extra empty buckets at end.
|
||||
gfh1Recoded := &histogram.FloatHistogram{
|
||||
CounterResetHint: histogram.GaugeType,
|
||||
Count: 6,
|
||||
ZeroCount: 2,
|
||||
ZeroThreshold: 0.001,
|
||||
Sum: 100,
|
||||
Schema: 0,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []float64{3, 1, 0, 0},
|
||||
}
|
||||
gfh2 := &histogram.FloatHistogram{
|
||||
CounterResetHint: histogram.GaugeType,
|
||||
Count: 17,
|
||||
|
@ -272,6 +353,9 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
samples: []chunks.Sample{
|
||||
hSample{t: 1, h: h1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: h1},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
|
||||
},
|
||||
"two histograms encoded to a single chunk": {
|
||||
|
@ -279,6 +363,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
hSample{t: 1, h: h1},
|
||||
hSample{t: 2, h: h2},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: h1Recoded},
|
||||
hSample{t: 2, h: h2},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
|
||||
},
|
||||
"two histograms encoded to two chunks": {
|
||||
|
@ -286,6 +374,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
hSample{t: 1, h: h2},
|
||||
hSample{t: 2, h: h1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: h2},
|
||||
hSample{t: 2, h: h1},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
|
||||
},
|
||||
"histogram and stale sample encoded to two chunks": {
|
||||
|
@ -293,6 +385,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
hSample{t: 1, h: staleHistogram},
|
||||
hSample{t: 2, h: h1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: staleHistogram},
|
||||
hSample{t: 2, h: h1},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
|
||||
},
|
||||
"histogram and reduction in bucket encoded to two chunks": {
|
||||
|
@ -300,6 +396,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
hSample{t: 1, h: h1},
|
||||
hSample{t: 2, h: h2down},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: h1},
|
||||
hSample{t: 2, h: h2down},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
|
||||
},
|
||||
// Float histograms.
|
||||
|
@ -307,6 +407,9 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
samples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: fh1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: fh1},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
|
||||
},
|
||||
"two float histograms encoded to a single chunk": {
|
||||
|
@ -314,6 +417,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
fhSample{t: 1, fh: fh1},
|
||||
fhSample{t: 2, fh: fh2},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: fh1Recoded},
|
||||
fhSample{t: 2, fh: fh2},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
|
||||
},
|
||||
"two float histograms encoded to two chunks": {
|
||||
|
@ -321,6 +428,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
fhSample{t: 1, fh: fh2},
|
||||
fhSample{t: 2, fh: fh1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: fh2},
|
||||
fhSample{t: 2, fh: fh1},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
|
||||
},
|
||||
"float histogram and stale sample encoded to two chunks": {
|
||||
|
@ -328,6 +439,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
fhSample{t: 1, fh: staleFloatHistogram},
|
||||
fhSample{t: 2, fh: fh1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: staleFloatHistogram},
|
||||
fhSample{t: 2, fh: fh1},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
|
||||
},
|
||||
"float histogram and reduction in bucket encoded to two chunks": {
|
||||
|
@ -335,6 +450,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
fhSample{t: 1, fh: fh1},
|
||||
fhSample{t: 2, fh: fh2down},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: fh1},
|
||||
fhSample{t: 2, fh: fh2down},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
|
||||
},
|
||||
// Mixed.
|
||||
|
@ -343,6 +462,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
hSample{t: 1, h: h1},
|
||||
fhSample{t: 2, fh: fh2},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: h1},
|
||||
fhSample{t: 2, fh: fh2},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
|
||||
},
|
||||
"float histogram and histogram encoded to two chunks": {
|
||||
|
@ -350,6 +473,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
fhSample{t: 1, fh: fh1},
|
||||
hSample{t: 2, h: h2},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: fh1},
|
||||
hSample{t: 2, h: h2},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
|
||||
},
|
||||
"histogram and stale float histogram encoded to two chunks": {
|
||||
|
@ -357,12 +484,19 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
hSample{t: 1, h: h1},
|
||||
fhSample{t: 2, fh: staleFloatHistogram},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: h1},
|
||||
fhSample{t: 2, fh: staleFloatHistogram},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
|
||||
},
|
||||
"single gauge histogram encoded to one chunk": {
|
||||
samples: []chunks.Sample{
|
||||
hSample{t: 1, h: gh1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: gh1},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
|
||||
},
|
||||
"two gauge histograms encoded to one chunk when counter increases": {
|
||||
|
@ -370,6 +504,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
hSample{t: 1, h: gh1},
|
||||
hSample{t: 2, h: gh2},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: gh1Recoded},
|
||||
hSample{t: 2, h: gh2},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
|
||||
},
|
||||
"two gauge histograms encoded to one chunk when counter decreases": {
|
||||
|
@ -377,12 +515,19 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
hSample{t: 1, h: gh2},
|
||||
hSample{t: 2, h: gh1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: gh2},
|
||||
hSample{t: 2, h: gh1Recoded},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
|
||||
},
|
||||
"single gauge float histogram encoded to one chunk": {
|
||||
samples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: gfh1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: gfh1},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
|
||||
},
|
||||
"two float gauge histograms encoded to one chunk when counter increases": {
|
||||
|
@ -390,6 +535,10 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
fhSample{t: 1, fh: gfh1},
|
||||
fhSample{t: 2, fh: gfh2},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: gfh1Recoded},
|
||||
fhSample{t: 2, fh: gfh2},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
|
||||
},
|
||||
"two float gauge histograms encoded to one chunk when counter decreases": {
|
||||
|
@ -397,8 +546,34 @@ func TestHistogramSeriesToChunks(t *testing.T) {
|
|||
fhSample{t: 1, fh: gfh2},
|
||||
fhSample{t: 2, fh: gfh1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: gfh2},
|
||||
fhSample{t: 2, fh: gfh1Recoded},
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
|
||||
},
|
||||
"histogram with extra empty bucket followed by histogram encodes to one chunk": {
|
||||
samples: []chunks.Sample{
|
||||
hSample{t: 1, h: h1ExtraBuckets},
|
||||
hSample{t: 2, h: h1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
hSample{t: 1, h: h1ExtraBuckets},
|
||||
hSample{t: 2, h: h1ExtraBuckets}, // Recoded to add the missing buckets.
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
|
||||
},
|
||||
"float histogram with extra empty bucket followed by float histogram encodes to one chunk": {
|
||||
samples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: fh1ExtraBuckets},
|
||||
fhSample{t: 2, fh: fh1},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
fhSample{t: 1, fh: fh1ExtraBuckets},
|
||||
fhSample{t: 2, fh: fh1ExtraBuckets}, // Recoded to add the missing buckets.
|
||||
},
|
||||
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
|
||||
},
|
||||
}
|
||||
|
||||
for testName, test := range tests {
|
||||
|
@ -431,9 +606,9 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
|
|||
|
||||
// Decode all encoded samples and assert they are equal to the original ones.
|
||||
encodedSamples := chunks.ChunkMetasToSamples(chks)
|
||||
require.Equal(t, len(test.samples), len(encodedSamples))
|
||||
require.Equal(t, len(test.expectedSamples), len(encodedSamples))
|
||||
|
||||
for i, s := range test.samples {
|
||||
for i, s := range test.expectedSamples {
|
||||
encodedSample := encodedSamples[i]
|
||||
switch expectedSample := s.(type) {
|
||||
case hSample:
|
||||
|
@ -447,7 +622,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
|
|||
require.True(t, value.IsStaleNaN(h.Sum), fmt.Sprintf("at idx %d", i))
|
||||
continue
|
||||
}
|
||||
require.Equal(t, *expectedSample.h, *h.Compact(0), fmt.Sprintf("at idx %d", i))
|
||||
require.Equal(t, *expectedSample.h, *h, fmt.Sprintf("at idx %d", i))
|
||||
case fhSample:
|
||||
require.Equal(t, chunkenc.ValFloatHistogram, encodedSample.Type(), "expect float histogram", fmt.Sprintf("at idx %d", i))
|
||||
fh := encodedSample.FH()
|
||||
|
@ -459,7 +634,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
|
|||
require.True(t, value.IsStaleNaN(fh.Sum), fmt.Sprintf("at idx %d", i))
|
||||
continue
|
||||
}
|
||||
require.Equal(t, *expectedSample.fh, *fh.Compact(0), fmt.Sprintf("at idx %d", i))
|
||||
require.Equal(t, *expectedSample.fh, *fh, fmt.Sprintf("at idx %d", i))
|
||||
default:
|
||||
t.Error("internal error, unexpected type")
|
||||
}
|
||||
|
|
|
@ -219,16 +219,25 @@ func (a *FloatHistogramAppender) Append(int64, float64) {
|
|||
}
|
||||
|
||||
// appendable returns whether the chunk can be appended to, and if so whether
|
||||
// any recoding needs to happen using the provided inserts (in case of any new
|
||||
// buckets, positive or negative range, respectively). If the sample is a gauge
|
||||
// histogram, AppendableGauge must be used instead.
|
||||
// 1. Any recoding needs to happen to the chunk using the provided forward
|
||||
// inserts (in case of any new buckets, positive or negative range,
|
||||
// respectively).
|
||||
// 2. Any recoding needs to happen for the histogram being appended, using the
|
||||
// backward inserts (in case of any missing buckets, positive or negative
|
||||
// range, respectively).
|
||||
//
|
||||
// If the sample is a gauge histogram, AppendableGauge must be used instead.
|
||||
//
|
||||
// The chunk is not appendable in the following cases:
|
||||
//
|
||||
// - The schema has changed.
|
||||
// - The custom bounds have changed if the current schema is custom buckets.
|
||||
// - The threshold for the zero bucket has changed.
|
||||
// - Any buckets have disappeared.
|
||||
// - There was a counter reset in the count of observations or in any bucket, including the zero bucket.
|
||||
// - Any buckets have disappeared, unless the bucket count was 0, unused.
|
||||
// Empty bucket can happen if the chunk was recoded and we're merging a non
|
||||
// recoded histogram. In this case backward inserts will be provided.
|
||||
// - There was a counter reset in the count of observations or in any bucket,
|
||||
// including the zero bucket.
|
||||
// - The last sample in the chunk was stale while the current sample is not stale.
|
||||
//
|
||||
// The method returns an additional boolean set to true if it is not appendable
|
||||
|
@ -236,6 +245,7 @@ func (a *FloatHistogramAppender) Append(int64, float64) {
|
|||
// append. If counterReset is true, okToAppend is always false.
|
||||
func (a *FloatHistogramAppender) appendable(h *histogram.FloatHistogram) (
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
backwardPositiveInserts, backwardNegativeInserts []Insert,
|
||||
okToAppend, counterReset bool,
|
||||
) {
|
||||
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
|
||||
|
@ -279,27 +289,214 @@ func (a *FloatHistogramAppender) appendable(h *histogram.FloatHistogram) (
|
|||
}
|
||||
|
||||
var ok bool
|
||||
positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans)
|
||||
positiveInserts, backwardPositiveInserts, ok = expandFloatSpansAndBuckets(a.pSpans, h.PositiveSpans, a.pBuckets, h.PositiveBuckets)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
}
|
||||
negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans)
|
||||
negativeInserts, backwardNegativeInserts, ok = expandFloatSpansAndBuckets(a.nSpans, h.NegativeSpans, a.nBuckets, h.NegativeBuckets)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
}
|
||||
|
||||
if counterResetInAnyFloatBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) ||
|
||||
counterResetInAnyFloatBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) {
|
||||
counterReset, positiveInserts, negativeInserts = true, nil, nil
|
||||
return
|
||||
}
|
||||
|
||||
okToAppend = true
|
||||
return
|
||||
}
|
||||
|
||||
// expandFloatSpansAndBuckets returns the inserts to expand the bucket spans 'a' so that
|
||||
// they match the spans in 'b'. 'b' must cover the same or more buckets than
|
||||
// 'a', otherwise the function will return false.
|
||||
// The function also returns the inserts to expand 'b' to also cover all the
|
||||
// buckets that are missing in 'b', but are present with 0 counter value in 'a'.
|
||||
// The function also checks for counter resets between 'a' and 'b'.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// Let's say the old buckets look like this:
|
||||
//
|
||||
// span syntax: [offset, length]
|
||||
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
|
||||
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
|
||||
// raw values 6 3 3 2 4 5 1
|
||||
// deltas 6 -3 0 -1 2 1 -4
|
||||
//
|
||||
// But now we introduce a new bucket layout. (Carefully chosen example where we
|
||||
// have a span appended, one unchanged[*], one prepended, and two merge - in
|
||||
// that order.)
|
||||
//
|
||||
// [*] unchanged in terms of which bucket indices they represent. but to achieve
|
||||
// that, their offset needs to change if "disrupted" by spans changing ahead of
|
||||
// them
|
||||
//
|
||||
// \/ this one is "unchanged"
|
||||
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
|
||||
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
|
||||
// raw values 6 3 0 3 0 0 2 4 5 0 1
|
||||
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
|
||||
// delta mods: / \ / \ / \
|
||||
//
|
||||
// Note for histograms with delta-encoded buckets: Whenever any new buckets are
|
||||
// introduced, the subsequent "old" bucket needs to readjust its delta to the
|
||||
// new base of 0. Thus, for the caller who wants to transform the set of
|
||||
// original deltas to a new set of deltas to match a new span layout that adds
|
||||
// buckets, we simply need to generate a list of inserts.
|
||||
//
|
||||
// Note: Within expandSpansForward we don't have to worry about the changes to the
|
||||
// spans themselves, thanks to the iterators we get to work with the more useful
|
||||
// bucket indices (which of course directly correspond to the buckets we have to
|
||||
// adjust).
|
||||
func expandFloatSpansAndBuckets(a, b []histogram.Span, aBuckets []xorValue, bBuckets []float64) (forward, backward []Insert, ok bool) {
|
||||
ai := newBucketIterator(a)
|
||||
bi := newBucketIterator(b)
|
||||
|
||||
var aInserts []Insert // To insert into buckets of a, to make up for missing buckets in b.
|
||||
var bInserts []Insert // To insert into buckets of b, to make up for missing empty(!) buckets in a.
|
||||
|
||||
// When aInter.num or bInter.num becomes > 0, this becomes a valid insert that should
|
||||
// be yielded when we finish a streak of new buckets.
|
||||
var aInter Insert
|
||||
var bInter Insert
|
||||
|
||||
aIdx, aOK := ai.Next()
|
||||
bIdx, bOK := bi.Next()
|
||||
|
||||
// Bucket count. Initialize the absolute count and index into the
|
||||
// positive/negative counts or deltas array. The bucket count is
|
||||
// used to detect counter reset as well as unused buckets in a.
|
||||
var (
|
||||
aCount float64
|
||||
bCount float64
|
||||
aCountIdx int
|
||||
bCountIdx int
|
||||
)
|
||||
if aOK {
|
||||
aCount = aBuckets[aCountIdx].value
|
||||
}
|
||||
if bOK {
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
|
||||
loop:
|
||||
for {
|
||||
switch {
|
||||
case aOK && bOK:
|
||||
switch {
|
||||
case aIdx == bIdx: // Both have an identical bucket index.
|
||||
// Bucket count. Check bucket for reset from a to b.
|
||||
if aCount > bCount {
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
// Finish WIP insert for a and reset.
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
|
||||
// Finish WIP insert for b and reset.
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
|
||||
aIdx, aOK = ai.Next()
|
||||
bIdx, bOK = bi.Next()
|
||||
aInter.pos++ // Advance potential insert position.
|
||||
aCountIdx++ // Advance absolute bucket count index for a.
|
||||
if aOK {
|
||||
aCount = aBuckets[aCountIdx].value
|
||||
}
|
||||
bInter.pos++ // Advance potential insert position.
|
||||
bCountIdx++ // Advance absolute bucket count index for b.
|
||||
if bOK {
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
|
||||
continue
|
||||
case aIdx < bIdx: // b misses a bucket index that is in a.
|
||||
// This is ok if the count in a is 0, in which case we make a note to
|
||||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++ // Mark that we need to insert a bucket in b.
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
aIdx, aOK = ai.Next()
|
||||
aInter.pos++
|
||||
aCountIdx++
|
||||
if aOK {
|
||||
aCount = aBuckets[aCountIdx].value
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Otherwise we are missing a bucket that was in use in a, which is a reset.
|
||||
return nil, nil, false
|
||||
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
bIdx, bOK = bi.Next()
|
||||
bInter.pos++
|
||||
bCountIdx++
|
||||
if bOK {
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
}
|
||||
case aOK && !bOK: // b misses a value that is in a.
|
||||
// This is ok if the count in a is 0, in which case we make a note to
|
||||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
aIdx, aOK = ai.Next()
|
||||
aInter.pos++ // Advance potential insert position.
|
||||
// Update absolute bucket counts for a.
|
||||
aCountIdx++
|
||||
if aOK {
|
||||
aCount = aBuckets[aCountIdx].value
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Otherwise we are missing a bucket that was in use in a, which is a reset.
|
||||
return nil, nil, false
|
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
bIdx, bOK = bi.Next()
|
||||
bInter.pos++ // Advance potential insert position.
|
||||
// Update absolute bucket counts for b.
|
||||
bCountIdx++
|
||||
if bOK {
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
default: // Both iterators ran out. We're done.
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
}
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
return aInserts, bInserts, true
|
||||
}
|
||||
|
||||
// appendableGauge returns whether the chunk can be appended to, and if so
|
||||
// whether:
|
||||
// 1. Any recoding needs to happen to the chunk using the provided inserts
|
||||
|
@ -349,76 +546,6 @@ func (a *FloatHistogramAppender) appendableGauge(h *histogram.FloatHistogram) (
|
|||
return
|
||||
}
|
||||
|
||||
// counterResetInAnyFloatBucket returns true if there was a counter reset for any
|
||||
// bucket. This should be called only when the bucket layout is the same or new
|
||||
// buckets were added. It does not handle the case of buckets missing.
|
||||
func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, oldSpans, newSpans []histogram.Span) bool {
|
||||
if len(oldSpans) == 0 || len(oldBuckets) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
var (
|
||||
oldSpanSliceIdx, newSpanSliceIdx int = -1, -1 // Index for the span slices. Starts at -1 to indicate that the first non empty span is not yet found.
|
||||
oldInsideSpanIdx, newInsideSpanIdx uint32 // Index inside a span.
|
||||
oldIdx, newIdx int32 // Index inside a bucket slice.
|
||||
oldBucketSliceIdx, newBucketSliceIdx int // Index inside bucket slice.
|
||||
)
|
||||
|
||||
// Find first non empty spans.
|
||||
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
|
||||
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
|
||||
oldVal, newVal := oldBuckets[0].value, newBuckets[0]
|
||||
|
||||
// Since we assume that new spans won't have missing buckets, there will never be a case
|
||||
// where the old index will not find a matching new index.
|
||||
for {
|
||||
if oldIdx == newIdx {
|
||||
if newVal < oldVal {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
if oldIdx <= newIdx {
|
||||
// Moving ahead old bucket and span by 1 index.
|
||||
if oldInsideSpanIdx+1 >= oldSpans[oldSpanSliceIdx].Length {
|
||||
// Current span is over.
|
||||
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
|
||||
oldInsideSpanIdx = 0
|
||||
if oldSpanSliceIdx >= len(oldSpans) {
|
||||
// All old spans are over.
|
||||
break
|
||||
}
|
||||
} else {
|
||||
oldInsideSpanIdx++
|
||||
oldIdx++
|
||||
}
|
||||
oldBucketSliceIdx++
|
||||
oldVal = oldBuckets[oldBucketSliceIdx].value
|
||||
}
|
||||
|
||||
if oldIdx > newIdx {
|
||||
// Moving ahead new bucket and span by 1 index.
|
||||
if newInsideSpanIdx+1 >= newSpans[newSpanSliceIdx].Length {
|
||||
// Current span is over.
|
||||
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
|
||||
newInsideSpanIdx = 0
|
||||
if newSpanSliceIdx >= len(newSpans) {
|
||||
// All new spans are over.
|
||||
// This should not happen, old spans above should catch this first.
|
||||
panic("new spans over before old spans in counterReset")
|
||||
}
|
||||
} else {
|
||||
newInsideSpanIdx++
|
||||
newIdx++
|
||||
}
|
||||
newBucketSliceIdx++
|
||||
newVal = newBuckets[newBucketSliceIdx]
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// appendFloatHistogram appends a float histogram to the chunk. The caller must ensure that
|
||||
// the histogram is properly structured, e.g. the number of buckets used
|
||||
// corresponds to the number conveyed by the span structures. First call
|
||||
|
@ -614,7 +741,7 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
|
|||
a.setCounterResetHeader(CounterReset)
|
||||
case prev != nil:
|
||||
// This is a new chunk, but continued from a previous one. We need to calculate the reset header unless already set.
|
||||
_, _, _, counterReset := prev.appendable(h)
|
||||
_, _, _, _, _, counterReset := prev.appendable(h)
|
||||
if counterReset {
|
||||
a.setCounterResetHeader(CounterReset)
|
||||
} else {
|
||||
|
@ -626,7 +753,7 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
|
|||
|
||||
// Adding counter-like histogram.
|
||||
if h.CounterResetHint != histogram.GaugeType {
|
||||
pForwardInserts, nForwardInserts, okToAppend, counterReset := a.appendable(h)
|
||||
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterReset := a.appendable(h)
|
||||
if !okToAppend || counterReset {
|
||||
if appendOnly {
|
||||
if counterReset {
|
||||
|
@ -657,6 +784,13 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
|
|||
app.(*FloatHistogramAppender).appendFloatHistogram(t, h)
|
||||
return chk, true, app, nil
|
||||
}
|
||||
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
||||
// The histogram needs to be expanded to have the extra empty buckets
|
||||
// of the chunk.
|
||||
h.PositiveSpans = a.pSpans
|
||||
h.NegativeSpans = a.nSpans
|
||||
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
||||
}
|
||||
a.appendFloatHistogram(t, h)
|
||||
return nil, false, a, nil
|
||||
}
|
||||
|
|
|
@ -245,9 +245,11 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) {
|
|||
h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3)
|
||||
// This is how span changes will be handled.
|
||||
hApp, _ := app.(*FloatHistogramAppender)
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2.ToFloat(nil))
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2.ToFloat(nil))
|
||||
require.NotEmpty(t, posInterjections)
|
||||
require.NotEmpty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok) // Only new buckets came in.
|
||||
require.False(t, cr)
|
||||
c, app = hApp.recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
|
||||
|
@ -333,7 +335,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
c, hApp, ts, h1 := setup(eh)
|
||||
h2 := h1.Copy()
|
||||
h2.Schema++
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.False(t, ok)
|
||||
|
||||
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
@ -343,7 +345,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
c, hApp, ts, h1 := setup(eh)
|
||||
h2 := h1.Copy()
|
||||
h2.ZeroThreshold += 0.1
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.False(t, ok)
|
||||
|
||||
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
@ -363,9 +365,11 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
h2.Sum = 30
|
||||
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1}
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.NotEmpty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok) // Only new buckets came in.
|
||||
require.False(t, cr)
|
||||
|
||||
|
@ -385,24 +389,56 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
h2.Sum = 21
|
||||
h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1}
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
require.True(t, cr)
|
||||
|
||||
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
|
||||
}
|
||||
|
||||
{ // New histogram that has buckets missing but the buckets missing were empty.
|
||||
emptyBucketH := eh.Copy()
|
||||
emptyBucketH.PositiveBuckets = []float64{6, 0, 3, 2, 4, 0, 1}
|
||||
c, hApp, ts, h1 := setup(emptyBucketH)
|
||||
h2 := h1.Copy()
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: 0, Length: 1},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 3, Length: 2},
|
||||
{Offset: 5, Length: 1},
|
||||
}
|
||||
h2.PositiveBuckets = []float64{7, 4, 3, 5, 2}
|
||||
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.NotEmpty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok)
|
||||
require.False(t, cr)
|
||||
|
||||
assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
||||
// Check that h2 was recoded.
|
||||
require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2}, h2.PositiveBuckets)
|
||||
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
|
||||
}
|
||||
|
||||
{ // New histogram that has a counter reset while buckets are same.
|
||||
c, hApp, ts, h1 := setup(eh)
|
||||
h2 := h1.Copy()
|
||||
h2.Sum = 23
|
||||
h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1}
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
require.True(t, cr)
|
||||
|
||||
|
@ -421,9 +457,11 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
h2.Sum = 29
|
||||
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0}
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
require.True(t, cr)
|
||||
|
||||
|
@ -448,9 +486,11 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
h2.Sum = 26
|
||||
h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1}
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
require.True(t, cr)
|
||||
|
||||
|
@ -524,10 +564,44 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
require.Equal(t, NotCounterReset, nextChunk.GetCounterResetHeader())
|
||||
}
|
||||
|
||||
{
|
||||
// Start a new chunk with a histogram that has an empty bucket.
|
||||
// Add a histogram that has the same bucket missing.
|
||||
// This should be appendable and can happen if we are merging from chunks
|
||||
// where the first sample came from a recoded chunk that added the
|
||||
// empty bucket.
|
||||
h1 := eh.Copy()
|
||||
// Add a bucket that is empty -10 offsets from the first bucket.
|
||||
h1.PositiveSpans = make([]histogram.Span, len(eh.PositiveSpans)+1)
|
||||
h1.PositiveSpans[0] = histogram.Span{Offset: eh.PositiveSpans[0].Offset - 10, Length: 1}
|
||||
h1.PositiveSpans[1] = histogram.Span{Offset: eh.PositiveSpans[0].Offset + 9, Length: eh.PositiveSpans[0].Length}
|
||||
for i, v := range eh.PositiveSpans[1:] {
|
||||
h1.PositiveSpans[i+2] = v
|
||||
}
|
||||
h1.PositiveBuckets = make([]float64, len(eh.PositiveBuckets)+1)
|
||||
h1.PositiveBuckets[0] = 0
|
||||
for i, v := range eh.PositiveBuckets {
|
||||
h1.PositiveBuckets[i+1] = v
|
||||
}
|
||||
|
||||
c, hApp, ts, _ := setup(h1)
|
||||
h2 := eh.Copy()
|
||||
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.NotEmpty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok)
|
||||
require.False(t, cr)
|
||||
|
||||
assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
}
|
||||
|
||||
{ // Custom buckets, no change.
|
||||
c, hApp, ts, h1 := setup(cbh)
|
||||
h2 := h1.Copy()
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.True(t, ok)
|
||||
|
||||
assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
@ -538,7 +612,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
h2 := h1.Copy()
|
||||
h2.Count++
|
||||
h2.PositiveBuckets = []float64{6, 3, 3, 2, 4, 5, 2}
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.True(t, ok)
|
||||
|
||||
assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
@ -549,7 +623,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
h2 := h1.Copy()
|
||||
h2.Count--
|
||||
h2.PositiveBuckets = []float64{6, 3, 3, 2, 4, 5, 0}
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.False(t, ok)
|
||||
|
||||
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
|
||||
|
@ -559,7 +633,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
c, hApp, ts, h1 := setup(cbh)
|
||||
h2 := h1.Copy()
|
||||
h2.CustomValues = []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21}
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.False(t, ok)
|
||||
|
||||
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
|
||||
|
@ -581,9 +655,11 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1} // (total 30)
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.NotEmpty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok) // Only new buckets came in.
|
||||
require.False(t, cr)
|
||||
|
||||
|
@ -839,9 +915,11 @@ func TestFloatHistogramChunkAppendableWithEmptySpan(t *testing.T) {
|
|||
require.Equal(t, 1, c.NumSamples())
|
||||
hApp, _ := app.(*FloatHistogramAppender)
|
||||
|
||||
pI, nI, okToAppend, counterReset := hApp.appendable(tc.h2)
|
||||
pI, nI, bpI, bnI, okToAppend, counterReset := hApp.appendable(tc.h2)
|
||||
require.Empty(t, pI)
|
||||
require.Empty(t, nI)
|
||||
require.Empty(t, bpI)
|
||||
require.Empty(t, bnI)
|
||||
require.True(t, okToAppend)
|
||||
require.False(t, counterReset)
|
||||
})
|
||||
|
|
|
@ -237,16 +237,23 @@ func (a *HistogramAppender) Append(int64, float64) {
|
|||
}
|
||||
|
||||
// appendable returns whether the chunk can be appended to, and if so whether
|
||||
// any recoding needs to happen using the provided inserts (in case of any new
|
||||
// buckets, positive or negative range, respectively). If the sample is a gauge
|
||||
// histogram, AppendableGauge must be used instead.
|
||||
// 1. Any recoding needs to happen to the chunk using the provided forward
|
||||
// inserts (in case of any new buckets, positive or negative range,
|
||||
// respectively).
|
||||
// 2. Any recoding needs to happen for the histogram being appended, using the
|
||||
// backward inserts (in case of any missing buckets, positive or negative
|
||||
// range, respectively).
|
||||
//
|
||||
// If the sample is a gauge histogram, AppendableGauge must be used instead.
|
||||
//
|
||||
// The chunk is not appendable in the following cases:
|
||||
//
|
||||
// - The schema has changed.
|
||||
// - The custom bounds have changed if the current schema is custom buckets.
|
||||
// - The threshold for the zero bucket has changed.
|
||||
// - Any buckets have disappeared.
|
||||
// - Any buckets have disappeared, unless the bucket count was 0, unused.
|
||||
// Empty bucket can happen if the chunk was recoded and we're merging a non
|
||||
// recoded histogram. In this case backward inserts will be provided.
|
||||
// - There was a counter reset in the count of observations or in any bucket,
|
||||
// including the zero bucket.
|
||||
// - The last sample in the chunk was stale while the current sample is not stale.
|
||||
|
@ -256,6 +263,7 @@ func (a *HistogramAppender) Append(int64, float64) {
|
|||
// append. If counterReset is true, okToAppend is always false.
|
||||
func (a *HistogramAppender) appendable(h *histogram.Histogram) (
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
backwardPositiveInserts, backwardNegativeInserts []Insert,
|
||||
okToAppend, counterReset bool,
|
||||
) {
|
||||
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
|
||||
|
@ -299,31 +307,219 @@ func (a *HistogramAppender) appendable(h *histogram.Histogram) (
|
|||
}
|
||||
|
||||
var ok bool
|
||||
positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans)
|
||||
positiveInserts, backwardPositiveInserts, ok = expandIntSpansAndBuckets(a.pSpans, h.PositiveSpans, a.pBuckets, h.PositiveBuckets)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
}
|
||||
negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans)
|
||||
negativeInserts, backwardNegativeInserts, ok = expandIntSpansAndBuckets(a.nSpans, h.NegativeSpans, a.nBuckets, h.NegativeBuckets)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
}
|
||||
|
||||
if counterResetInAnyBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) ||
|
||||
counterResetInAnyBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) {
|
||||
counterReset, positiveInserts, negativeInserts = true, nil, nil
|
||||
return
|
||||
}
|
||||
|
||||
okToAppend = true
|
||||
return
|
||||
}
|
||||
|
||||
// expandIntSpansAndBuckets returns the inserts to expand the bucket spans 'a' so that
|
||||
// they match the spans in 'b'. 'b' must cover the same or more buckets than
|
||||
// 'a', otherwise the function will return false.
|
||||
// The function also returns the inserts to expand 'b' to also cover all the
|
||||
// buckets that are missing in 'b', but are present with 0 counter value in 'a'.
|
||||
// The function also checks for counter resets between 'a' and 'b'.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// Let's say the old buckets look like this:
|
||||
//
|
||||
// span syntax: [offset, length]
|
||||
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
|
||||
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
|
||||
// raw values 6 3 3 2 4 5 1
|
||||
// deltas 6 -3 0 -1 2 1 -4
|
||||
//
|
||||
// But now we introduce a new bucket layout. (Carefully chosen example where we
|
||||
// have a span appended, one unchanged[*], one prepended, and two merge - in
|
||||
// that order.)
|
||||
//
|
||||
// [*] unchanged in terms of which bucket indices they represent. but to achieve
|
||||
// that, their offset needs to change if "disrupted" by spans changing ahead of
|
||||
// them
|
||||
//
|
||||
// \/ this one is "unchanged"
|
||||
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
|
||||
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
|
||||
// raw values 6 3 0 3 0 0 2 4 5 0 1
|
||||
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
|
||||
// delta mods: / \ / \ / \
|
||||
//
|
||||
// Note for histograms with delta-encoded buckets: Whenever any new buckets are
|
||||
// introduced, the subsequent "old" bucket needs to readjust its delta to the
|
||||
// new base of 0. Thus, for the caller who wants to transform the set of
|
||||
// original deltas to a new set of deltas to match a new span layout that adds
|
||||
// buckets, we simply need to generate a list of inserts.
|
||||
//
|
||||
// Note: Within expandSpansForward we don't have to worry about the changes to the
|
||||
// spans themselves, thanks to the iterators we get to work with the more useful
|
||||
// bucket indices (which of course directly correspond to the buckets we have to
|
||||
// adjust).
|
||||
func expandIntSpansAndBuckets(a, b []histogram.Span, aBuckets, bBuckets []int64) (forward, backward []Insert, ok bool) {
|
||||
ai := newBucketIterator(a)
|
||||
bi := newBucketIterator(b)
|
||||
|
||||
var aInserts []Insert // To insert into buckets of a, to make up for missing buckets in b.
|
||||
var bInserts []Insert // To insert into buckets of b, to make up for missing empty(!) buckets in a.
|
||||
|
||||
// When aInter.num or bInter.num becomes > 0, this becomes a valid insert that should
|
||||
// be yielded when we finish a streak of new buckets.
|
||||
var aInter Insert
|
||||
var bInter Insert
|
||||
|
||||
aIdx, aOK := ai.Next()
|
||||
bIdx, bOK := bi.Next()
|
||||
|
||||
// Bucket count. Initialize the absolute count and index into the
|
||||
// positive/negative counts or deltas array. The bucket count is
|
||||
// used to detect counter reset as well as unused buckets in a.
|
||||
var (
|
||||
aCount int64
|
||||
bCount int64
|
||||
aCountIdx int
|
||||
bCountIdx int
|
||||
)
|
||||
if aOK {
|
||||
aCount = aBuckets[aCountIdx]
|
||||
}
|
||||
if bOK {
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
|
||||
loop:
|
||||
for {
|
||||
switch {
|
||||
case aOK && bOK:
|
||||
switch {
|
||||
case aIdx == bIdx: // Both have an identical bucket index.
|
||||
// Bucket count. Check bucket for reset from a to b.
|
||||
if aCount > bCount {
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
// Finish WIP insert for a and reset.
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
|
||||
// Finish WIP insert for b and reset.
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
|
||||
aIdx, aOK = ai.Next()
|
||||
bIdx, bOK = bi.Next()
|
||||
aInter.pos++ // Advance potential insert position.
|
||||
aCountIdx++ // Advance absolute bucket count index for a.
|
||||
if aOK {
|
||||
aCount += aBuckets[aCountIdx]
|
||||
}
|
||||
bInter.pos++ // Advance potential insert position.
|
||||
bCountIdx++ // Advance absolute bucket count index for b.
|
||||
if bOK {
|
||||
bCount += bBuckets[bCountIdx]
|
||||
}
|
||||
|
||||
continue
|
||||
case aIdx < bIdx: // b misses a bucket index that is in a.
|
||||
// This is ok if the count in a is 0, in which case we make a note to
|
||||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++ // Mark that we need to insert a bucket in b.
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
aIdx, aOK = ai.Next()
|
||||
aInter.pos++
|
||||
aCountIdx++
|
||||
if aOK {
|
||||
aCount += aBuckets[aCountIdx]
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Otherwise we are missing a bucket that was in use in a, which is a reset.
|
||||
return nil, nil, false
|
||||
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
bIdx, bOK = bi.Next()
|
||||
bInter.pos++
|
||||
bCountIdx++
|
||||
if bOK {
|
||||
bCount += bBuckets[bCountIdx]
|
||||
}
|
||||
}
|
||||
case aOK && !bOK: // b misses a value that is in a.
|
||||
// This is ok if the count in a is 0, in which case we make a note to
|
||||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
aIdx, aOK = ai.Next()
|
||||
aInter.pos++ // Advance potential insert position.
|
||||
// Update absolute bucket counts for a.
|
||||
aCountIdx++
|
||||
if aOK {
|
||||
aCount += aBuckets[aCountIdx]
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Otherwise we are missing a bucket that was in use in a, which is a reset.
|
||||
return nil, nil, false
|
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
bIdx, bOK = bi.Next()
|
||||
bInter.pos++ // Advance potential insert position.
|
||||
// Update absolute bucket counts for b.
|
||||
bCountIdx++
|
||||
if bOK {
|
||||
bCount += bBuckets[bCountIdx]
|
||||
}
|
||||
default: // Both iterators ran out. We're done.
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
}
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
return aInserts, bInserts, true
|
||||
}
|
||||
|
||||
// appendableGauge returns whether the chunk can be appended to, and if so
|
||||
// whether:
|
||||
// 1. Any recoding needs to happen to the chunk using the provided inserts
|
||||
// (in case of any new buckets, positive or negative range, respectively).
|
||||
// 1. Any recoding needs to happen to the chunk using the provided forward
|
||||
// inserts (in case of any new buckets, positive or negative range,
|
||||
// respectively).
|
||||
// 2. Any recoding needs to happen for the histogram being appended, using the
|
||||
// backward inserts (in case of any missing buckets, positive or negative
|
||||
// range, respectively).
|
||||
|
@ -369,76 +565,6 @@ func (a *HistogramAppender) appendableGauge(h *histogram.Histogram) (
|
|||
return
|
||||
}
|
||||
|
||||
// counterResetInAnyBucket returns true if there was a counter reset for any
|
||||
// bucket. This should be called only when the bucket layout is the same or new
|
||||
// buckets were added. It does not handle the case of buckets missing.
|
||||
func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans []histogram.Span) bool {
|
||||
if len(oldSpans) == 0 || len(oldBuckets) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
var (
|
||||
oldSpanSliceIdx, newSpanSliceIdx int = -1, -1 // Index for the span slices. Starts at -1 to indicate that the first non empty span is not yet found.
|
||||
oldInsideSpanIdx, newInsideSpanIdx uint32 // Index inside a span.
|
||||
oldIdx, newIdx int32 // Index inside a bucket slice.
|
||||
oldBucketSliceIdx, newBucketSliceIdx int // Index inside bucket slice.
|
||||
)
|
||||
|
||||
// Find first non empty spans.
|
||||
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
|
||||
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
|
||||
oldVal, newVal := oldBuckets[0], newBuckets[0]
|
||||
|
||||
// Since we assume that new spans won't have missing buckets, there will never be a case
|
||||
// where the old index will not find a matching new index.
|
||||
for {
|
||||
if oldIdx == newIdx {
|
||||
if newVal < oldVal {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
if oldIdx <= newIdx {
|
||||
// Moving ahead old bucket and span by 1 index.
|
||||
if oldInsideSpanIdx+1 >= oldSpans[oldSpanSliceIdx].Length {
|
||||
// Current span is over.
|
||||
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
|
||||
oldInsideSpanIdx = 0
|
||||
if oldSpanSliceIdx >= len(oldSpans) {
|
||||
// All old spans are over.
|
||||
break
|
||||
}
|
||||
} else {
|
||||
oldInsideSpanIdx++
|
||||
oldIdx++
|
||||
}
|
||||
oldBucketSliceIdx++
|
||||
oldVal += oldBuckets[oldBucketSliceIdx]
|
||||
}
|
||||
|
||||
if oldIdx > newIdx {
|
||||
// Moving ahead new bucket and span by 1 index.
|
||||
if newInsideSpanIdx+1 >= newSpans[newSpanSliceIdx].Length {
|
||||
// Current span is over.
|
||||
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
|
||||
newInsideSpanIdx = 0
|
||||
if newSpanSliceIdx >= len(newSpans) {
|
||||
// All new spans are over.
|
||||
// This should not happen, old spans above should catch this first.
|
||||
panic("new spans over before old spans in counterReset")
|
||||
}
|
||||
} else {
|
||||
newInsideSpanIdx++
|
||||
newIdx++
|
||||
}
|
||||
newBucketSliceIdx++
|
||||
newVal += newBuckets[newBucketSliceIdx]
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// appendHistogram appends a histogram to the chunk. The caller must ensure that
|
||||
// the histogram is properly structured, e.g. the number of buckets used
|
||||
// corresponds to the number conveyed by the span structures. First call
|
||||
|
@ -649,7 +775,7 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
|
|||
a.setCounterResetHeader(CounterReset)
|
||||
case prev != nil:
|
||||
// This is a new chunk, but continued from a previous one. We need to calculate the reset header unless already set.
|
||||
_, _, _, counterReset := prev.appendable(h)
|
||||
_, _, _, _, _, counterReset := prev.appendable(h)
|
||||
if counterReset {
|
||||
a.setCounterResetHeader(CounterReset)
|
||||
} else {
|
||||
|
@ -661,7 +787,7 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
|
|||
|
||||
// Adding counter-like histogram.
|
||||
if h.CounterResetHint != histogram.GaugeType {
|
||||
pForwardInserts, nForwardInserts, okToAppend, counterReset := a.appendable(h)
|
||||
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterReset := a.appendable(h)
|
||||
if !okToAppend || counterReset {
|
||||
if appendOnly {
|
||||
if counterReset {
|
||||
|
@ -692,6 +818,13 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
|
|||
app.(*HistogramAppender).appendHistogram(t, h)
|
||||
return chk, true, app, nil
|
||||
}
|
||||
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
||||
// The histogram needs to be expanded to have the extra empty buckets
|
||||
// of the chunk.
|
||||
h.PositiveSpans = a.pSpans
|
||||
h.NegativeSpans = a.nSpans
|
||||
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
||||
}
|
||||
a.appendHistogram(t, h)
|
||||
return nil, false, a, nil
|
||||
}
|
||||
|
|
|
@ -280,6 +280,9 @@ type Insert struct {
|
|||
num int
|
||||
}
|
||||
|
||||
// Deprecated: expandSpansForward, use expandIntSpansAndBuckets or
|
||||
// expandFloatSpansAndBuckets instead.
|
||||
// expandSpansForward is left here for reference.
|
||||
// expandSpansForward returns the inserts to expand the bucket spans 'a' so that
|
||||
// they match the spans in 'b'. 'b' must cover the same or more buckets than
|
||||
// 'a', otherwise the function will return false.
|
||||
|
@ -574,15 +577,3 @@ func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterR
|
|||
return histogram.UnknownCounterReset
|
||||
}
|
||||
}
|
||||
|
||||
// Handle pathological case of empty span when advancing span idx.
|
||||
// Call it with idx==-1 to find the first non empty span.
|
||||
func nextNonEmptySpanSliceIdx(idx int, bucketIdx int32, spans []histogram.Span) (newIdx int, newBucketIdx int32) {
|
||||
for idx++; idx < len(spans); idx++ {
|
||||
if spans[idx].Length > 0 {
|
||||
return idx, bucketIdx + spans[idx].Offset + 1
|
||||
}
|
||||
bucketIdx += spans[idx].Offset
|
||||
}
|
||||
return idx, 0
|
||||
}
|
||||
|
|
|
@ -256,9 +256,11 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
|
|||
h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3)
|
||||
// This is how span changes will be handled.
|
||||
hApp, _ := app.(*HistogramAppender)
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.NotEmpty(t, posInterjections)
|
||||
require.NotEmpty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok) // Only new buckets came in.
|
||||
require.False(t, cr)
|
||||
c, app = hApp.recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
|
||||
|
@ -347,7 +349,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
c, hApp, ts, h1 := setup(eh)
|
||||
h2 := h1.Copy()
|
||||
h2.Schema++
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.False(t, ok)
|
||||
|
||||
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
@ -357,7 +359,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
c, hApp, ts, h1 := setup(eh)
|
||||
h2 := h1.Copy()
|
||||
h2.ZeroThreshold += 0.1
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.False(t, ok)
|
||||
|
||||
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
@ -380,9 +382,11 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.NotEmpty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok) // Only new buckets came in.
|
||||
require.False(t, cr)
|
||||
|
||||
|
@ -401,24 +405,57 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
h2.Sum = 21
|
||||
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21)
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
require.True(t, cr)
|
||||
|
||||
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
|
||||
}
|
||||
|
||||
{ // New histogram that has buckets missing but the buckets missing were empty.
|
||||
emptyBucketH := eh.Copy()
|
||||
emptyBucketH.PositiveBuckets = []int64{6, -6, 1, 1, -2, 1, 1} // counts: 6, 0, 1, 2, 0, 1, 2 (total 12)
|
||||
c, hApp, ts, h1 := setup(emptyBucketH)
|
||||
h2 := h1.Copy()
|
||||
h2.PositiveSpans = []histogram.Span{ // Missing buckets at offset 1 and 9.
|
||||
{Offset: 0, Length: 1},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 4, Length: 1},
|
||||
{Offset: 1, Length: 1},
|
||||
}
|
||||
h2.PositiveBuckets = []int64{7, -5, 1, 0, 1} // counts: 7, 2, 3, 3, 4 (total 18)
|
||||
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.NotEmpty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok)
|
||||
require.False(t, cr)
|
||||
|
||||
assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
||||
// Check that h2 was recoded.
|
||||
require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 4 (total 18)
|
||||
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
|
||||
}
|
||||
|
||||
{ // New histogram that has a counter reset while buckets are same.
|
||||
c, hApp, ts, h1 := setup(eh)
|
||||
h2 := h1.Copy()
|
||||
h2.Sum = 23
|
||||
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23)
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
require.True(t, cr)
|
||||
|
||||
|
@ -440,9 +477,11 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29)
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
require.True(t, cr)
|
||||
|
||||
|
@ -470,9 +509,11 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26)
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
require.True(t, cr)
|
||||
|
||||
|
@ -549,10 +590,44 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
require.Equal(t, NotCounterReset, nextChunk.GetCounterResetHeader())
|
||||
}
|
||||
|
||||
{
|
||||
// Start a new chunk with a histogram that has an empty bucket.
|
||||
// Add a histogram that has the same bucket missing.
|
||||
// This should be appendable and can happen if we are merging from chunks
|
||||
// where the first sample came from a recoded chunk that added the
|
||||
// empty bucket.
|
||||
h1 := eh.Copy()
|
||||
// Add a bucket that is empty -10 offsets from the first bucket.
|
||||
h1.PositiveSpans = make([]histogram.Span, len(eh.PositiveSpans)+1)
|
||||
h1.PositiveSpans[0] = histogram.Span{Offset: eh.PositiveSpans[0].Offset - 10, Length: 1}
|
||||
h1.PositiveSpans[1] = histogram.Span{Offset: eh.PositiveSpans[0].Offset + 9, Length: eh.PositiveSpans[0].Length}
|
||||
for i, v := range eh.PositiveSpans[1:] {
|
||||
h1.PositiveSpans[i+2] = v
|
||||
}
|
||||
h1.PositiveBuckets = make([]int64, len(eh.PositiveBuckets)+1)
|
||||
h1.PositiveBuckets[0] = 0
|
||||
for i, v := range eh.PositiveBuckets {
|
||||
h1.PositiveBuckets[i+1] = v
|
||||
}
|
||||
|
||||
c, hApp, ts, _ := setup(h1)
|
||||
h2 := eh.Copy()
|
||||
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.Empty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.NotEmpty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok)
|
||||
require.False(t, cr)
|
||||
|
||||
assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
}
|
||||
|
||||
{ // Custom buckets, no change.
|
||||
c, hApp, ts, h1 := setup(cbh)
|
||||
h2 := h1.Copy()
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.True(t, ok)
|
||||
|
||||
assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
@ -563,7 +638,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
h2 := h1.Copy()
|
||||
h2.Count++
|
||||
h2.PositiveBuckets = []int64{6, -3, 0, -1, 2, 1, -3}
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.True(t, ok)
|
||||
|
||||
assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
@ -574,7 +649,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
h2 := h1.Copy()
|
||||
h2.Count--
|
||||
h2.PositiveBuckets = []int64{6, -3, 0, -1, 2, 1, -5}
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.False(t, ok)
|
||||
|
||||
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
|
||||
|
@ -584,7 +659,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
c, hApp, ts, h1 := setup(cbh)
|
||||
h2 := h1.Copy()
|
||||
h2.CustomValues = []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21}
|
||||
_, _, ok, _ := hApp.appendable(h2)
|
||||
_, _, _, _, ok, _ := hApp.appendable(h2)
|
||||
require.False(t, ok)
|
||||
|
||||
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
|
||||
|
@ -606,9 +681,11 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
|
||||
|
||||
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.NotEmpty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.Empty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok) // Only new buckets came in.
|
||||
require.False(t, cr)
|
||||
|
||||
|
@ -875,9 +952,11 @@ func TestHistogramChunkAppendableWithEmptySpan(t *testing.T) {
|
|||
require.Equal(t, 1, c.NumSamples())
|
||||
hApp, _ := app.(*HistogramAppender)
|
||||
|
||||
pI, nI, okToAppend, counterReset := hApp.appendable(tc.h2)
|
||||
pI, nI, bpI, bnI, okToAppend, counterReset := hApp.appendable(tc.h2)
|
||||
require.Empty(t, pI)
|
||||
require.Empty(t, nI)
|
||||
require.Empty(t, bpI)
|
||||
require.Empty(t, bnI)
|
||||
require.True(t, okToAppend)
|
||||
require.False(t, counterReset)
|
||||
})
|
||||
|
@ -1368,3 +1447,50 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
|
|||
require.EqualError(t, err, "histogram counter reset")
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkAppendable(b *testing.B) {
|
||||
// Create a histogram with a bunch of spans and buckets.
|
||||
const (
|
||||
numSpans = 1000
|
||||
spanLength = 10
|
||||
)
|
||||
h := &histogram.Histogram{
|
||||
Schema: 0,
|
||||
Count: 100,
|
||||
Sum: 1000,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 5,
|
||||
}
|
||||
for i := 0; i < numSpans; i++ {
|
||||
h.PositiveSpans = append(h.PositiveSpans, histogram.Span{Offset: 5, Length: spanLength})
|
||||
h.NegativeSpans = append(h.NegativeSpans, histogram.Span{Offset: 5, Length: spanLength})
|
||||
for j := 0; j < spanLength; j++ {
|
||||
h.PositiveBuckets = append(h.PositiveBuckets, int64(j))
|
||||
h.NegativeBuckets = append(h.NegativeBuckets, int64(j))
|
||||
}
|
||||
}
|
||||
|
||||
c := Chunk(NewHistogramChunk())
|
||||
|
||||
// Create fresh appender and add the first histogram.
|
||||
app, err := c.Appender()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
_, _, _, err = app.AppendHistogram(nil, 1, h, true)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
hApp := app.(*HistogramAppender)
|
||||
|
||||
isAppendable := true
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _, _, _, ok, _ := hApp.appendable(h)
|
||||
isAppendable = isAppendable && ok
|
||||
}
|
||||
if !isAppendable {
|
||||
b.Fail()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue