Revert "Native histograms: fix spurios counter reset when merging recoded chu…"

This reverts commit 00ab05c3b9.
This commit is contained in:
George Krajcsovits 2024-08-06 14:55:06 +02:00 committed by GitHub
parent 44e5231ea4
commit fb1eaee509
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 215 additions and 852 deletions

View file

@ -126,7 +126,6 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) {
type histogramTest struct {
samples []chunks.Sample
expectedSamples []chunks.Sample
expectedCounterResetHeaders []chunkenc.CounterResetHeader
}
@ -142,32 +141,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
},
PositiveBuckets: []int64{2, 1}, // Abs: 2, 3
}
// h1 but with an extra empty bucket at offset -10.
// This can happen if h1 is from a recoded chunk, where a later histogram had a bucket at offset -10.
h1ExtraBuckets := &histogram.Histogram{
Count: 7,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100,
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: -10, Length: 1},
{Offset: 9, Length: 2},
},
PositiveBuckets: []int64{0, 2, 1}, // Abs: 0, 2, 3
}
h1Recoded := &histogram.Histogram{
Count: 7,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100,
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, 1, -3, 0}, // Abs: 2, 3, 0, 0
}
// Appendable to h1.
h2 := &histogram.Histogram{
Count: 12,
@ -206,32 +179,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
},
PositiveBuckets: []float64{3, 1},
}
// fh1 but with an extra empty bucket at offset -10.
// This can happen if fh1 is from a recoded chunk, where a later histogram had a bucket at offset -10.
fh1ExtraBuckets := &histogram.FloatHistogram{
Count: 6,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100,
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: -10, Length: 1},
{Offset: 9, Length: 2},
},
PositiveBuckets: []float64{0, 3, 1},
}
fh1Recoded := &histogram.FloatHistogram{
Count: 6,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100,
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []float64{3, 1, 0, 0},
}
// Appendable to fh1.
fh2 := &histogram.FloatHistogram{
Count: 17,
@ -272,20 +219,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
},
PositiveBuckets: []int64{2, 1}, // Abs: 2, 3
}
// gh1 recoded to add extra empty buckets at end.
gh1Recoded := &histogram.Histogram{
CounterResetHint: histogram.GaugeType,
Count: 7,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100,
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, 1, -3, 0}, // Abs: 2, 3, 0, 0
}
gh2 := &histogram.Histogram{
CounterResetHint: histogram.GaugeType,
Count: 12,
@ -313,20 +246,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
},
PositiveBuckets: []float64{3, 1},
}
// gfh1 recoded to add an extra empty buckets at end.
gfh1Recoded := &histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Count: 6,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100,
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []float64{3, 1, 0, 0},
}
gfh2 := &histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Count: 17,
@ -353,9 +272,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
samples: []chunks.Sample{
hSample{t: 1, h: h1},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: h1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
},
"two histograms encoded to a single chunk": {
@ -363,10 +279,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 1, h: h1},
hSample{t: 2, h: h2},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: h1Recoded},
hSample{t: 2, h: h2},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
},
"two histograms encoded to two chunks": {
@ -374,10 +286,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 1, h: h2},
hSample{t: 2, h: h1},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: h2},
hSample{t: 2, h: h1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
},
"histogram and stale sample encoded to two chunks": {
@ -385,10 +293,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 1, h: staleHistogram},
hSample{t: 2, h: h1},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: staleHistogram},
hSample{t: 2, h: h1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
},
"histogram and reduction in bucket encoded to two chunks": {
@ -396,10 +300,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 1, h: h1},
hSample{t: 2, h: h2down},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: h1},
hSample{t: 2, h: h2down},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
},
// Float histograms.
@ -407,9 +307,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
samples: []chunks.Sample{
fhSample{t: 1, fh: fh1},
},
expectedSamples: []chunks.Sample{
fhSample{t: 1, fh: fh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
},
"two float histograms encoded to a single chunk": {
@ -417,10 +314,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 1, fh: fh1},
fhSample{t: 2, fh: fh2},
},
expectedSamples: []chunks.Sample{
fhSample{t: 1, fh: fh1Recoded},
fhSample{t: 2, fh: fh2},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
},
"two float histograms encoded to two chunks": {
@ -428,10 +321,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 1, fh: fh2},
fhSample{t: 2, fh: fh1},
},
expectedSamples: []chunks.Sample{
fhSample{t: 1, fh: fh2},
fhSample{t: 2, fh: fh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
},
"float histogram and stale sample encoded to two chunks": {
@ -439,10 +328,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 1, fh: staleFloatHistogram},
fhSample{t: 2, fh: fh1},
},
expectedSamples: []chunks.Sample{
fhSample{t: 1, fh: staleFloatHistogram},
fhSample{t: 2, fh: fh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
},
"float histogram and reduction in bucket encoded to two chunks": {
@ -450,10 +335,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 1, fh: fh1},
fhSample{t: 2, fh: fh2down},
},
expectedSamples: []chunks.Sample{
fhSample{t: 1, fh: fh1},
fhSample{t: 2, fh: fh2down},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
},
// Mixed.
@ -462,10 +343,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 1, h: h1},
fhSample{t: 2, fh: fh2},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: h1},
fhSample{t: 2, fh: fh2},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
},
"float histogram and histogram encoded to two chunks": {
@ -473,10 +350,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 1, fh: fh1},
hSample{t: 2, h: h2},
},
expectedSamples: []chunks.Sample{
fhSample{t: 1, fh: fh1},
hSample{t: 2, h: h2},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
},
"histogram and stale float histogram encoded to two chunks": {
@ -484,19 +357,12 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 1, h: h1},
fhSample{t: 2, fh: staleFloatHistogram},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: h1},
fhSample{t: 2, fh: staleFloatHistogram},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
},
"single gauge histogram encoded to one chunk": {
samples: []chunks.Sample{
hSample{t: 1, h: gh1},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: gh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"two gauge histograms encoded to one chunk when counter increases": {
@ -504,10 +370,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 1, h: gh1},
hSample{t: 2, h: gh2},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: gh1Recoded},
hSample{t: 2, h: gh2},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"two gauge histograms encoded to one chunk when counter decreases": {
@ -515,19 +377,12 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 1, h: gh2},
hSample{t: 2, h: gh1},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: gh2},
hSample{t: 2, h: gh1Recoded},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"single gauge float histogram encoded to one chunk": {
samples: []chunks.Sample{
fhSample{t: 1, fh: gfh1},
},
expectedSamples: []chunks.Sample{
fhSample{t: 1, fh: gfh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"two float gauge histograms encoded to one chunk when counter increases": {
@ -535,10 +390,6 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 1, fh: gfh1},
fhSample{t: 2, fh: gfh2},
},
expectedSamples: []chunks.Sample{
fhSample{t: 1, fh: gfh1Recoded},
fhSample{t: 2, fh: gfh2},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"two float gauge histograms encoded to one chunk when counter decreases": {
@ -546,34 +397,8 @@ func TestHistogramSeriesToChunks(t *testing.T) {
fhSample{t: 1, fh: gfh2},
fhSample{t: 2, fh: gfh1},
},
expectedSamples: []chunks.Sample{
fhSample{t: 1, fh: gfh2},
fhSample{t: 2, fh: gfh1Recoded},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"histogram with extra empty bucket followed by histogram encodes to one chunk": {
samples: []chunks.Sample{
hSample{t: 1, h: h1ExtraBuckets},
hSample{t: 2, h: h1},
},
expectedSamples: []chunks.Sample{
hSample{t: 1, h: h1ExtraBuckets},
hSample{t: 2, h: h1ExtraBuckets}, // Recoded to add the missing buckets.
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
},
"float histogram with extra empty bucket followed by float histogram encodes to one chunk": {
samples: []chunks.Sample{
fhSample{t: 1, fh: fh1ExtraBuckets},
fhSample{t: 2, fh: fh1},
},
expectedSamples: []chunks.Sample{
fhSample{t: 1, fh: fh1ExtraBuckets},
fhSample{t: 2, fh: fh1ExtraBuckets}, // Recoded to add the missing buckets.
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
},
}
for testName, test := range tests {
@ -606,9 +431,9 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
// Decode all encoded samples and assert they are equal to the original ones.
encodedSamples := chunks.ChunkMetasToSamples(chks)
require.Equal(t, len(test.expectedSamples), len(encodedSamples))
require.Equal(t, len(test.samples), len(encodedSamples))
for i, s := range test.expectedSamples {
for i, s := range test.samples {
encodedSample := encodedSamples[i]
switch expectedSample := s.(type) {
case hSample:
@ -622,7 +447,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
require.True(t, value.IsStaleNaN(h.Sum), fmt.Sprintf("at idx %d", i))
continue
}
require.Equal(t, *expectedSample.h, *h, fmt.Sprintf("at idx %d", i))
require.Equal(t, *expectedSample.h, *h.Compact(0), fmt.Sprintf("at idx %d", i))
case fhSample:
require.Equal(t, chunkenc.ValFloatHistogram, encodedSample.Type(), "expect float histogram", fmt.Sprintf("at idx %d", i))
fh := encodedSample.FH()
@ -634,7 +459,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
require.True(t, value.IsStaleNaN(fh.Sum), fmt.Sprintf("at idx %d", i))
continue
}
require.Equal(t, *expectedSample.fh, *fh, fmt.Sprintf("at idx %d", i))
require.Equal(t, *expectedSample.fh, *fh.Compact(0), fmt.Sprintf("at idx %d", i))
default:
t.Error("internal error, unexpected type")
}

View file

@ -219,25 +219,16 @@ func (a *FloatHistogramAppender) Append(int64, float64) {
}
// appendable returns whether the chunk can be appended to, and if so whether
// 1. Any recoding needs to happen to the chunk using the provided forward
// inserts (in case of any new buckets, positive or negative range,
// respectively).
// 2. Any recoding needs to happen for the histogram being appended, using the
// backward inserts (in case of any missing buckets, positive or negative
// range, respectively).
//
// If the sample is a gauge histogram, AppendableGauge must be used instead.
// any recoding needs to happen using the provided inserts (in case of any new
// buckets, positive or negative range, respectively). If the sample is a gauge
// histogram, AppendableGauge must be used instead.
//
// The chunk is not appendable in the following cases:
//
// - The schema has changed.
// - The custom bounds have changed if the current schema is custom buckets.
// - The threshold for the zero bucket has changed.
// - Any buckets have disappeared, unless the bucket count was 0, unused.
// Empty bucket can happen if the chunk was recoded and we're merging a non
// recoded histogram. In this case backward inserts will be provided.
// - There was a counter reset in the count of observations or in any bucket,
// including the zero bucket.
// - Any buckets have disappeared.
// - There was a counter reset in the count of observations or in any bucket, including the zero bucket.
// - The last sample in the chunk was stale while the current sample is not stale.
//
// The method returns an additional boolean set to true if it is not appendable
@ -245,7 +236,6 @@ func (a *FloatHistogramAppender) Append(int64, float64) {
// append. If counterReset is true, okToAppend is always false.
func (a *FloatHistogramAppender) appendable(h *histogram.FloatHistogram) (
positiveInserts, negativeInserts []Insert,
backwardPositiveInserts, backwardNegativeInserts []Insert,
okToAppend, counterReset bool,
) {
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
@ -289,214 +279,27 @@ func (a *FloatHistogramAppender) appendable(h *histogram.FloatHistogram) (
}
var ok bool
positiveInserts, backwardPositiveInserts, ok = expandFloatSpansAndBuckets(a.pSpans, h.PositiveSpans, a.pBuckets, h.PositiveBuckets)
positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans)
if !ok {
counterReset = true
return
}
negativeInserts, backwardNegativeInserts, ok = expandFloatSpansAndBuckets(a.nSpans, h.NegativeSpans, a.nBuckets, h.NegativeBuckets)
negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans)
if !ok {
counterReset = true
return
}
if counterResetInAnyFloatBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) ||
counterResetInAnyFloatBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) {
counterReset, positiveInserts, negativeInserts = true, nil, nil
return
}
okToAppend = true
return
}
// expandFloatSpansAndBuckets returns the inserts to expand the bucket spans 'a' so that
// they match the spans in 'b'. 'b' must cover the same or more buckets than
// 'a', otherwise the function will return false.
// The function also returns the inserts to expand 'b' to also cover all the
// buckets that are missing in 'b', but are present with 0 counter value in 'a'.
// The function also checks for counter resets between 'a' and 'b'.
//
// Example:
//
// Let's say the old buckets look like this:
//
// span syntax: [offset, length]
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
// raw values 6 3 3 2 4 5 1
// deltas 6 -3 0 -1 2 1 -4
//
// But now we introduce a new bucket layout. (Carefully chosen example where we
// have a span appended, one unchanged[*], one prepended, and two merge - in
// that order.)
//
// [*] unchanged in terms of which bucket indices they represent. but to achieve
// that, their offset needs to change if "disrupted" by spans changing ahead of
// them
//
// \/ this one is "unchanged"
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
// raw values 6 3 0 3 0 0 2 4 5 0 1
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
// delta mods: / \ / \ / \
//
// Note for histograms with delta-encoded buckets: Whenever any new buckets are
// introduced, the subsequent "old" bucket needs to readjust its delta to the
// new base of 0. Thus, for the caller who wants to transform the set of
// original deltas to a new set of deltas to match a new span layout that adds
// buckets, we simply need to generate a list of inserts.
//
// Note: Within expandSpansForward we don't have to worry about the changes to the
// spans themselves, thanks to the iterators we get to work with the more useful
// bucket indices (which of course directly correspond to the buckets we have to
// adjust).
func expandFloatSpansAndBuckets(a, b []histogram.Span, aBuckets []xorValue, bBuckets []float64) (forward, backward []Insert, ok bool) {
ai := newBucketIterator(a)
bi := newBucketIterator(b)
var aInserts []Insert // To insert into buckets of a, to make up for missing buckets in b.
var bInserts []Insert // To insert into buckets of b, to make up for missing empty(!) buckets in a.
// When aInter.num or bInter.num becomes > 0, this becomes a valid insert that should
// be yielded when we finish a streak of new buckets.
var aInter Insert
var bInter Insert
aIdx, aOK := ai.Next()
bIdx, bOK := bi.Next()
// Bucket count. Initialize the absolute count and index into the
// positive/negative counts or deltas array. The bucket count is
// used to detect counter reset as well as unused buckets in a.
var (
aCount float64
bCount float64
aCountIdx int
bCountIdx int
)
if aOK {
aCount = aBuckets[aCountIdx].value
}
if bOK {
bCount = bBuckets[bCountIdx]
}
loop:
for {
switch {
case aOK && bOK:
switch {
case aIdx == bIdx: // Both have an identical bucket index.
// Bucket count. Check bucket for reset from a to b.
if aCount > bCount {
return nil, nil, false
}
// Finish WIP insert for a and reset.
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
aInter.num = 0
}
// Finish WIP insert for b and reset.
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
bInter.num = 0
}
aIdx, aOK = ai.Next()
bIdx, bOK = bi.Next()
aInter.pos++ // Advance potential insert position.
aCountIdx++ // Advance absolute bucket count index for a.
if aOK {
aCount = aBuckets[aCountIdx].value
}
bInter.pos++ // Advance potential insert position.
bCountIdx++ // Advance absolute bucket count index for b.
if bOK {
bCount = bBuckets[bCountIdx]
}
continue
case aIdx < bIdx: // b misses a bucket index that is in a.
// This is ok if the count in a is 0, in which case we make a note to
// fill in the bucket in b and advance a.
if aCount == 0 {
bInter.num++ // Mark that we need to insert a bucket in b.
// Advance a
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
aInter.num = 0
}
aIdx, aOK = ai.Next()
aInter.pos++
aCountIdx++
if aOK {
aCount = aBuckets[aCountIdx].value
}
continue
}
// Otherwise we are missing a bucket that was in use in a, which is a reset.
return nil, nil, false
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
aInter.num++
// Advance b
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
bInter.num = 0
}
bIdx, bOK = bi.Next()
bInter.pos++
bCountIdx++
if bOK {
bCount = bBuckets[bCountIdx]
}
}
case aOK && !bOK: // b misses a value that is in a.
// This is ok if the count in a is 0, in which case we make a note to
// fill in the bucket in b and advance a.
if aCount == 0 {
bInter.num++
// Advance a
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
aInter.num = 0
}
aIdx, aOK = ai.Next()
aInter.pos++ // Advance potential insert position.
// Update absolute bucket counts for a.
aCountIdx++
if aOK {
aCount = aBuckets[aCountIdx].value
}
continue
}
// Otherwise we are missing a bucket that was in use in a, which is a reset.
return nil, nil, false
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
aInter.num++
// Advance b
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
bInter.num = 0
}
bIdx, bOK = bi.Next()
bInter.pos++ // Advance potential insert position.
// Update absolute bucket counts for b.
bCountIdx++
if bOK {
bCount = bBuckets[bCountIdx]
}
default: // Both iterators ran out. We're done.
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
}
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
}
break loop
}
}
return aInserts, bInserts, true
}
// appendableGauge returns whether the chunk can be appended to, and if so
// whether:
// 1. Any recoding needs to happen to the chunk using the provided inserts
@ -546,6 +349,76 @@ func (a *FloatHistogramAppender) appendableGauge(h *histogram.FloatHistogram) (
return
}
// counterResetInAnyFloatBucket returns true if there was a counter reset for any
// bucket. This should be called only when the bucket layout is the same or new
// buckets were added. It does not handle the case of buckets missing.
func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, oldSpans, newSpans []histogram.Span) bool {
if len(oldSpans) == 0 || len(oldBuckets) == 0 {
return false
}
var (
oldSpanSliceIdx, newSpanSliceIdx int = -1, -1 // Index for the span slices. Starts at -1 to indicate that the first non empty span is not yet found.
oldInsideSpanIdx, newInsideSpanIdx uint32 // Index inside a span.
oldIdx, newIdx int32 // Index inside a bucket slice.
oldBucketSliceIdx, newBucketSliceIdx int // Index inside bucket slice.
)
// Find first non empty spans.
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
oldVal, newVal := oldBuckets[0].value, newBuckets[0]
// Since we assume that new spans won't have missing buckets, there will never be a case
// where the old index will not find a matching new index.
for {
if oldIdx == newIdx {
if newVal < oldVal {
return true
}
}
if oldIdx <= newIdx {
// Moving ahead old bucket and span by 1 index.
if oldInsideSpanIdx+1 >= oldSpans[oldSpanSliceIdx].Length {
// Current span is over.
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
oldInsideSpanIdx = 0
if oldSpanSliceIdx >= len(oldSpans) {
// All old spans are over.
break
}
} else {
oldInsideSpanIdx++
oldIdx++
}
oldBucketSliceIdx++
oldVal = oldBuckets[oldBucketSliceIdx].value
}
if oldIdx > newIdx {
// Moving ahead new bucket and span by 1 index.
if newInsideSpanIdx+1 >= newSpans[newSpanSliceIdx].Length {
// Current span is over.
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
newInsideSpanIdx = 0
if newSpanSliceIdx >= len(newSpans) {
// All new spans are over.
// This should not happen, old spans above should catch this first.
panic("new spans over before old spans in counterReset")
}
} else {
newInsideSpanIdx++
newIdx++
}
newBucketSliceIdx++
newVal = newBuckets[newBucketSliceIdx]
}
}
return false
}
// appendFloatHistogram appends a float histogram to the chunk. The caller must ensure that
// the histogram is properly structured, e.g. the number of buckets used
// corresponds to the number conveyed by the span structures. First call
@ -741,7 +614,7 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
a.setCounterResetHeader(CounterReset)
case prev != nil:
// This is a new chunk, but continued from a previous one. We need to calculate the reset header unless already set.
_, _, _, _, _, counterReset := prev.appendable(h)
_, _, _, counterReset := prev.appendable(h)
if counterReset {
a.setCounterResetHeader(CounterReset)
} else {
@ -753,7 +626,7 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
// Adding counter-like histogram.
if h.CounterResetHint != histogram.GaugeType {
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterReset := a.appendable(h)
pForwardInserts, nForwardInserts, okToAppend, counterReset := a.appendable(h)
if !okToAppend || counterReset {
if appendOnly {
if counterReset {
@ -784,13 +657,6 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
app.(*FloatHistogramAppender).appendFloatHistogram(t, h)
return chk, true, app, nil
}
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
// The histogram needs to be expanded to have the extra empty buckets
// of the chunk.
h.PositiveSpans = a.pSpans
h.NegativeSpans = a.nSpans
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
a.appendFloatHistogram(t, h)
return nil, false, a, nil
}

View file

@ -245,11 +245,9 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) {
h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3)
// This is how span changes will be handled.
hApp, _ := app.(*FloatHistogramAppender)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2.ToFloat(nil))
posInterjections, negInterjections, ok, cr := hApp.appendable(h2.ToFloat(nil))
require.NotEmpty(t, posInterjections)
require.NotEmpty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
c, app = hApp.recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
@ -335,7 +333,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
c, hApp, ts, h1 := setup(eh)
h2 := h1.Copy()
h2.Schema++
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
@ -345,7 +343,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
c, hApp, ts, h1 := setup(eh)
h2 := h1.Copy()
h2.ZeroThreshold += 0.1
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
@ -365,11 +363,9 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
h2.Sum = 30
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1}
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.NotEmpty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
@ -389,56 +385,24 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
h2.Sum = 21
h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1}
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
}
{ // New histogram that has buckets missing but the buckets missing were empty.
emptyBucketH := eh.Copy()
emptyBucketH.PositiveBuckets = []float64{6, 0, 3, 2, 4, 0, 1}
c, hApp, ts, h1 := setup(emptyBucketH)
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 1},
{Offset: 3, Length: 1},
{Offset: 3, Length: 2},
{Offset: 5, Length: 1},
}
h2.PositiveBuckets = []float64{7, 4, 3, 5, 2}
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.NotEmpty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.True(t, ok)
require.False(t, cr)
assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
// Check that h2 was recoded.
require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2}, h2.PositiveBuckets)
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
}
{ // New histogram that has a counter reset while buckets are same.
c, hApp, ts, h1 := setup(eh)
h2 := h1.Copy()
h2.Sum = 23
h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1}
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
@ -457,11 +421,9 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
h2.Sum = 29
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0}
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
@ -486,11 +448,9 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
h2.Sum = 26
h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1}
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
@ -564,44 +524,10 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
require.Equal(t, NotCounterReset, nextChunk.GetCounterResetHeader())
}
{
// Start a new chunk with a histogram that has an empty bucket.
// Add a histogram that has the same bucket missing.
// This should be appendable and can happen if we are merging from chunks
// where the first sample came from a recoded chunk that added the
// empty bucket.
h1 := eh.Copy()
// Add a bucket that is empty -10 offsets from the first bucket.
h1.PositiveSpans = make([]histogram.Span, len(eh.PositiveSpans)+1)
h1.PositiveSpans[0] = histogram.Span{Offset: eh.PositiveSpans[0].Offset - 10, Length: 1}
h1.PositiveSpans[1] = histogram.Span{Offset: eh.PositiveSpans[0].Offset + 9, Length: eh.PositiveSpans[0].Length}
for i, v := range eh.PositiveSpans[1:] {
h1.PositiveSpans[i+2] = v
}
h1.PositiveBuckets = make([]float64, len(eh.PositiveBuckets)+1)
h1.PositiveBuckets[0] = 0
for i, v := range eh.PositiveBuckets {
h1.PositiveBuckets[i+1] = v
}
c, hApp, ts, _ := setup(h1)
h2 := eh.Copy()
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.NotEmpty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.True(t, ok)
require.False(t, cr)
assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
}
{ // Custom buckets, no change.
c, hApp, ts, h1 := setup(cbh)
h2 := h1.Copy()
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.True(t, ok)
assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
@ -612,7 +538,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
h2 := h1.Copy()
h2.Count++
h2.PositiveBuckets = []float64{6, 3, 3, 2, 4, 5, 2}
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.True(t, ok)
assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
@ -623,7 +549,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
h2 := h1.Copy()
h2.Count--
h2.PositiveBuckets = []float64{6, 3, 3, 2, 4, 5, 0}
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
@ -633,7 +559,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
c, hApp, ts, h1 := setup(cbh)
h2 := h1.Copy()
h2.CustomValues = []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21}
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
@ -655,11 +581,9 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1} // (total 30)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.NotEmpty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
@ -915,11 +839,9 @@ func TestFloatHistogramChunkAppendableWithEmptySpan(t *testing.T) {
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, bpI, bnI, okToAppend, counterReset := hApp.appendable(tc.h2)
pI, nI, okToAppend, counterReset := hApp.appendable(tc.h2)
require.Empty(t, pI)
require.Empty(t, nI)
require.Empty(t, bpI)
require.Empty(t, bnI)
require.True(t, okToAppend)
require.False(t, counterReset)
})

View file

@ -237,23 +237,16 @@ func (a *HistogramAppender) Append(int64, float64) {
}
// appendable returns whether the chunk can be appended to, and if so whether
// 1. Any recoding needs to happen to the chunk using the provided forward
// inserts (in case of any new buckets, positive or negative range,
// respectively).
// 2. Any recoding needs to happen for the histogram being appended, using the
// backward inserts (in case of any missing buckets, positive or negative
// range, respectively).
//
// If the sample is a gauge histogram, AppendableGauge must be used instead.
// any recoding needs to happen using the provided inserts (in case of any new
// buckets, positive or negative range, respectively). If the sample is a gauge
// histogram, AppendableGauge must be used instead.
//
// The chunk is not appendable in the following cases:
//
// - The schema has changed.
// - The custom bounds have changed if the current schema is custom buckets.
// - The threshold for the zero bucket has changed.
// - Any buckets have disappeared, unless the bucket count was 0, unused.
// Empty bucket can happen if the chunk was recoded and we're merging a non
// recoded histogram. In this case backward inserts will be provided.
// - Any buckets have disappeared.
// - There was a counter reset in the count of observations or in any bucket,
// including the zero bucket.
// - The last sample in the chunk was stale while the current sample is not stale.
@ -263,7 +256,6 @@ func (a *HistogramAppender) Append(int64, float64) {
// append. If counterReset is true, okToAppend is always false.
func (a *HistogramAppender) appendable(h *histogram.Histogram) (
positiveInserts, negativeInserts []Insert,
backwardPositiveInserts, backwardNegativeInserts []Insert,
okToAppend, counterReset bool,
) {
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
@ -307,219 +299,31 @@ func (a *HistogramAppender) appendable(h *histogram.Histogram) (
}
var ok bool
positiveInserts, backwardPositiveInserts, ok = expandIntSpansAndBuckets(a.pSpans, h.PositiveSpans, a.pBuckets, h.PositiveBuckets)
positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans)
if !ok {
counterReset = true
return
}
negativeInserts, backwardNegativeInserts, ok = expandIntSpansAndBuckets(a.nSpans, h.NegativeSpans, a.nBuckets, h.NegativeBuckets)
negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans)
if !ok {
counterReset = true
return
}
if counterResetInAnyBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) ||
counterResetInAnyBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) {
counterReset, positiveInserts, negativeInserts = true, nil, nil
return
}
okToAppend = true
return
}
// expandIntSpansAndBuckets returns the inserts to expand the bucket spans 'a' so that
// they match the spans in 'b'. 'b' must cover the same or more buckets than
// 'a', otherwise the function will return false.
// The function also returns the inserts to expand 'b' to also cover all the
// buckets that are missing in 'b', but are present with 0 counter value in 'a'.
// The function also checks for counter resets between 'a' and 'b'.
//
// Example:
//
// Let's say the old buckets look like this:
//
// span syntax: [offset, length]
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
// raw values 6 3 3 2 4 5 1
// deltas 6 -3 0 -1 2 1 -4
//
// But now we introduce a new bucket layout. (Carefully chosen example where we
// have a span appended, one unchanged[*], one prepended, and two merge - in
// that order.)
//
// [*] unchanged in terms of which bucket indices they represent. but to achieve
// that, their offset needs to change if "disrupted" by spans changing ahead of
// them
//
// \/ this one is "unchanged"
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
// raw values 6 3 0 3 0 0 2 4 5 0 1
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
// delta mods: / \ / \ / \
//
// Note for histograms with delta-encoded buckets: Whenever any new buckets are
// introduced, the subsequent "old" bucket needs to readjust its delta to the
// new base of 0. Thus, for the caller who wants to transform the set of
// original deltas to a new set of deltas to match a new span layout that adds
// buckets, we simply need to generate a list of inserts.
//
// Note: Within expandSpansForward we don't have to worry about the changes to the
// spans themselves, thanks to the iterators we get to work with the more useful
// bucket indices (which of course directly correspond to the buckets we have to
// adjust).
func expandIntSpansAndBuckets(a, b []histogram.Span, aBuckets, bBuckets []int64) (forward, backward []Insert, ok bool) {
ai := newBucketIterator(a)
bi := newBucketIterator(b)
var aInserts []Insert // To insert into buckets of a, to make up for missing buckets in b.
var bInserts []Insert // To insert into buckets of b, to make up for missing empty(!) buckets in a.
// When aInter.num or bInter.num becomes > 0, this becomes a valid insert that should
// be yielded when we finish a streak of new buckets.
var aInter Insert
var bInter Insert
aIdx, aOK := ai.Next()
bIdx, bOK := bi.Next()
// Bucket count. Initialize the absolute count and index into the
// positive/negative counts or deltas array. The bucket count is
// used to detect counter reset as well as unused buckets in a.
var (
aCount int64
bCount int64
aCountIdx int
bCountIdx int
)
if aOK {
aCount = aBuckets[aCountIdx]
}
if bOK {
bCount = bBuckets[bCountIdx]
}
loop:
for {
switch {
case aOK && bOK:
switch {
case aIdx == bIdx: // Both have an identical bucket index.
// Bucket count. Check bucket for reset from a to b.
if aCount > bCount {
return nil, nil, false
}
// Finish WIP insert for a and reset.
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
aInter.num = 0
}
// Finish WIP insert for b and reset.
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
bInter.num = 0
}
aIdx, aOK = ai.Next()
bIdx, bOK = bi.Next()
aInter.pos++ // Advance potential insert position.
aCountIdx++ // Advance absolute bucket count index for a.
if aOK {
aCount += aBuckets[aCountIdx]
}
bInter.pos++ // Advance potential insert position.
bCountIdx++ // Advance absolute bucket count index for b.
if bOK {
bCount += bBuckets[bCountIdx]
}
continue
case aIdx < bIdx: // b misses a bucket index that is in a.
// This is ok if the count in a is 0, in which case we make a note to
// fill in the bucket in b and advance a.
if aCount == 0 {
bInter.num++ // Mark that we need to insert a bucket in b.
// Advance a
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
aInter.num = 0
}
aIdx, aOK = ai.Next()
aInter.pos++
aCountIdx++
if aOK {
aCount += aBuckets[aCountIdx]
}
continue
}
// Otherwise we are missing a bucket that was in use in a, which is a reset.
return nil, nil, false
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
aInter.num++
// Advance b
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
bInter.num = 0
}
bIdx, bOK = bi.Next()
bInter.pos++
bCountIdx++
if bOK {
bCount += bBuckets[bCountIdx]
}
}
case aOK && !bOK: // b misses a value that is in a.
// This is ok if the count in a is 0, in which case we make a note to
// fill in the bucket in b and advance a.
if aCount == 0 {
bInter.num++
// Advance a
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
aInter.num = 0
}
aIdx, aOK = ai.Next()
aInter.pos++ // Advance potential insert position.
// Update absolute bucket counts for a.
aCountIdx++
if aOK {
aCount += aBuckets[aCountIdx]
}
continue
}
// Otherwise we are missing a bucket that was in use in a, which is a reset.
return nil, nil, false
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
aInter.num++
// Advance b
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
bInter.num = 0
}
bIdx, bOK = bi.Next()
bInter.pos++ // Advance potential insert position.
// Update absolute bucket counts for b.
bCountIdx++
if bOK {
bCount += bBuckets[bCountIdx]
}
default: // Both iterators ran out. We're done.
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
}
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
}
break loop
}
}
return aInserts, bInserts, true
}
// appendableGauge returns whether the chunk can be appended to, and if so
// whether:
// 1. Any recoding needs to happen to the chunk using the provided forward
// inserts (in case of any new buckets, positive or negative range,
// respectively).
// 1. Any recoding needs to happen to the chunk using the provided inserts
// (in case of any new buckets, positive or negative range, respectively).
// 2. Any recoding needs to happen for the histogram being appended, using the
// backward inserts (in case of any missing buckets, positive or negative
// range, respectively).
@ -565,6 +369,76 @@ func (a *HistogramAppender) appendableGauge(h *histogram.Histogram) (
return
}
// counterResetInAnyBucket returns true if there was a counter reset for any
// bucket. This should be called only when the bucket layout is the same or new
// buckets were added. It does not handle the case of buckets missing.
func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans []histogram.Span) bool {
if len(oldSpans) == 0 || len(oldBuckets) == 0 {
return false
}
var (
oldSpanSliceIdx, newSpanSliceIdx int = -1, -1 // Index for the span slices. Starts at -1 to indicate that the first non empty span is not yet found.
oldInsideSpanIdx, newInsideSpanIdx uint32 // Index inside a span.
oldIdx, newIdx int32 // Index inside a bucket slice.
oldBucketSliceIdx, newBucketSliceIdx int // Index inside bucket slice.
)
// Find first non empty spans.
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
oldVal, newVal := oldBuckets[0], newBuckets[0]
// Since we assume that new spans won't have missing buckets, there will never be a case
// where the old index will not find a matching new index.
for {
if oldIdx == newIdx {
if newVal < oldVal {
return true
}
}
if oldIdx <= newIdx {
// Moving ahead old bucket and span by 1 index.
if oldInsideSpanIdx+1 >= oldSpans[oldSpanSliceIdx].Length {
// Current span is over.
oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans)
oldInsideSpanIdx = 0
if oldSpanSliceIdx >= len(oldSpans) {
// All old spans are over.
break
}
} else {
oldInsideSpanIdx++
oldIdx++
}
oldBucketSliceIdx++
oldVal += oldBuckets[oldBucketSliceIdx]
}
if oldIdx > newIdx {
// Moving ahead new bucket and span by 1 index.
if newInsideSpanIdx+1 >= newSpans[newSpanSliceIdx].Length {
// Current span is over.
newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans)
newInsideSpanIdx = 0
if newSpanSliceIdx >= len(newSpans) {
// All new spans are over.
// This should not happen, old spans above should catch this first.
panic("new spans over before old spans in counterReset")
}
} else {
newInsideSpanIdx++
newIdx++
}
newBucketSliceIdx++
newVal += newBuckets[newBucketSliceIdx]
}
}
return false
}
// appendHistogram appends a histogram to the chunk. The caller must ensure that
// the histogram is properly structured, e.g. the number of buckets used
// corresponds to the number conveyed by the span structures. First call
@ -775,7 +649,7 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
a.setCounterResetHeader(CounterReset)
case prev != nil:
// This is a new chunk, but continued from a previous one. We need to calculate the reset header unless already set.
_, _, _, _, _, counterReset := prev.appendable(h)
_, _, _, counterReset := prev.appendable(h)
if counterReset {
a.setCounterResetHeader(CounterReset)
} else {
@ -787,7 +661,7 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
// Adding counter-like histogram.
if h.CounterResetHint != histogram.GaugeType {
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterReset := a.appendable(h)
pForwardInserts, nForwardInserts, okToAppend, counterReset := a.appendable(h)
if !okToAppend || counterReset {
if appendOnly {
if counterReset {
@ -818,13 +692,6 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
app.(*HistogramAppender).appendHistogram(t, h)
return chk, true, app, nil
}
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
// The histogram needs to be expanded to have the extra empty buckets
// of the chunk.
h.PositiveSpans = a.pSpans
h.NegativeSpans = a.nSpans
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
a.appendHistogram(t, h)
return nil, false, a, nil
}

View file

@ -280,9 +280,6 @@ type Insert struct {
num int
}
// Deprecated: expandSpansForward, use expandIntSpansAndBuckets or
// expandFloatSpansAndBuckets instead.
// expandSpansForward is left here for reference.
// expandSpansForward returns the inserts to expand the bucket spans 'a' so that
// they match the spans in 'b'. 'b' must cover the same or more buckets than
// 'a', otherwise the function will return false.
@ -577,3 +574,15 @@ func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterR
return histogram.UnknownCounterReset
}
}
// Handle pathological case of empty span when advancing span idx.
// Call it with idx==-1 to find the first non empty span.
func nextNonEmptySpanSliceIdx(idx int, bucketIdx int32, spans []histogram.Span) (newIdx int, newBucketIdx int32) {
for idx++; idx < len(spans); idx++ {
if spans[idx].Length > 0 {
return idx, bucketIdx + spans[idx].Offset + 1
}
bucketIdx += spans[idx].Offset
}
return idx, 0
}

View file

@ -256,11 +256,9 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3)
// This is how span changes will be handled.
hApp, _ := app.(*HistogramAppender)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.NotEmpty(t, posInterjections)
require.NotEmpty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
c, app = hApp.recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
@ -349,7 +347,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
c, hApp, ts, h1 := setup(eh)
h2 := h1.Copy()
h2.Schema++
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
@ -359,7 +357,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
c, hApp, ts, h1 := setup(eh)
h2 := h1.Copy()
h2.ZeroThreshold += 0.1
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
@ -382,11 +380,9 @@ func TestHistogramChunkAppendable(t *testing.T) {
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.NotEmpty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
@ -405,57 +401,24 @@ func TestHistogramChunkAppendable(t *testing.T) {
h2.Sum = 21
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
}
{ // New histogram that has buckets missing but the buckets missing were empty.
emptyBucketH := eh.Copy()
emptyBucketH.PositiveBuckets = []int64{6, -6, 1, 1, -2, 1, 1} // counts: 6, 0, 1, 2, 0, 1, 2 (total 12)
c, hApp, ts, h1 := setup(emptyBucketH)
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{ // Missing buckets at offset 1 and 9.
{Offset: 0, Length: 1},
{Offset: 3, Length: 1},
{Offset: 3, Length: 1},
{Offset: 4, Length: 1},
{Offset: 1, Length: 1},
}
h2.PositiveBuckets = []int64{7, -5, 1, 0, 1} // counts: 7, 2, 3, 3, 4 (total 18)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.NotEmpty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.True(t, ok)
require.False(t, cr)
assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
// Check that h2 was recoded.
require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 4 (total 18)
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
}
{ // New histogram that has a counter reset while buckets are same.
c, hApp, ts, h1 := setup(eh)
h2 := h1.Copy()
h2.Sum = 23
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
@ -477,11 +440,9 @@ func TestHistogramChunkAppendable(t *testing.T) {
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
@ -509,11 +470,9 @@ func TestHistogramChunkAppendable(t *testing.T) {
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
@ -590,44 +549,10 @@ func TestHistogramChunkAppendable(t *testing.T) {
require.Equal(t, NotCounterReset, nextChunk.GetCounterResetHeader())
}
{
// Start a new chunk with a histogram that has an empty bucket.
// Add a histogram that has the same bucket missing.
// This should be appendable and can happen if we are merging from chunks
// where the first sample came from a recoded chunk that added the
// empty bucket.
h1 := eh.Copy()
// Add a bucket that is empty -10 offsets from the first bucket.
h1.PositiveSpans = make([]histogram.Span, len(eh.PositiveSpans)+1)
h1.PositiveSpans[0] = histogram.Span{Offset: eh.PositiveSpans[0].Offset - 10, Length: 1}
h1.PositiveSpans[1] = histogram.Span{Offset: eh.PositiveSpans[0].Offset + 9, Length: eh.PositiveSpans[0].Length}
for i, v := range eh.PositiveSpans[1:] {
h1.PositiveSpans[i+2] = v
}
h1.PositiveBuckets = make([]int64, len(eh.PositiveBuckets)+1)
h1.PositiveBuckets[0] = 0
for i, v := range eh.PositiveBuckets {
h1.PositiveBuckets[i+1] = v
}
c, hApp, ts, _ := setup(h1)
h2 := eh.Copy()
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
require.Empty(t, posInterjections)
require.Empty(t, negInterjections)
require.NotEmpty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.True(t, ok)
require.False(t, cr)
assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
}
{ // Custom buckets, no change.
c, hApp, ts, h1 := setup(cbh)
h2 := h1.Copy()
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.True(t, ok)
assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
@ -638,7 +563,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
h2 := h1.Copy()
h2.Count++
h2.PositiveBuckets = []int64{6, -3, 0, -1, 2, 1, -3}
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.True(t, ok)
assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
@ -649,7 +574,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
h2 := h1.Copy()
h2.Count--
h2.PositiveBuckets = []int64{6, -3, 0, -1, 2, 1, -5}
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
@ -659,7 +584,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
c, hApp, ts, h1 := setup(cbh)
h2 := h1.Copy()
h2.CustomValues = []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21}
_, _, _, _, ok, _ := hApp.appendable(h2)
_, _, ok, _ := hApp.appendable(h2)
require.False(t, ok)
assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset)
@ -681,11 +606,9 @@ func TestHistogramChunkAppendable(t *testing.T) {
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
posInterjections, negInterjections, ok, cr := hApp.appendable(h2)
require.NotEmpty(t, posInterjections)
require.Empty(t, negInterjections)
require.Empty(t, backwardPositiveInserts)
require.Empty(t, backwardNegativeInserts)
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
@ -952,11 +875,9 @@ func TestHistogramChunkAppendableWithEmptySpan(t *testing.T) {
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*HistogramAppender)
pI, nI, bpI, bnI, okToAppend, counterReset := hApp.appendable(tc.h2)
pI, nI, okToAppend, counterReset := hApp.appendable(tc.h2)
require.Empty(t, pI)
require.Empty(t, nI)
require.Empty(t, bpI)
require.Empty(t, bnI)
require.True(t, okToAppend)
require.False(t, counterReset)
})
@ -1447,50 +1368,3 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
require.EqualError(t, err, "histogram counter reset")
})
}
func BenchmarkAppendable(b *testing.B) {
// Create a histogram with a bunch of spans and buckets.
const (
numSpans = 1000
spanLength = 10
)
h := &histogram.Histogram{
Schema: 0,
Count: 100,
Sum: 1000,
ZeroThreshold: 0.001,
ZeroCount: 5,
}
for i := 0; i < numSpans; i++ {
h.PositiveSpans = append(h.PositiveSpans, histogram.Span{Offset: 5, Length: spanLength})
h.NegativeSpans = append(h.NegativeSpans, histogram.Span{Offset: 5, Length: spanLength})
for j := 0; j < spanLength; j++ {
h.PositiveBuckets = append(h.PositiveBuckets, int64(j))
h.NegativeBuckets = append(h.NegativeBuckets, int64(j))
}
}
c := Chunk(NewHistogramChunk())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
if err != nil {
b.Fatal(err)
}
_, _, _, err = app.AppendHistogram(nil, 1, h, true)
if err != nil {
b.Fatal(err)
}
hApp := app.(*HistogramAppender)
isAppendable := true
for i := 0; i < b.N; i++ {
_, _, _, _, ok, _ := hApp.appendable(h)
isAppendable = isAppendable && ok
}
if !isAppendable {
b.Fail()
}
}