diff --git a/storage/series_test.go b/storage/series_test.go index 6995468b44..51886f409b 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -126,6 +126,7 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) { type histogramTest struct { samples []chunks.Sample + expectedSamples []chunks.Sample expectedCounterResetHeaders []chunkenc.CounterResetHeader } @@ -141,6 +142,32 @@ func TestHistogramSeriesToChunks(t *testing.T) { }, PositiveBuckets: []int64{2, 1}, // Abs: 2, 3 } + // h1 but with an extra empty bucket at offset -10. + // This can happen if h1 is from a recoded chunk, where a later histogram had a bucket at offset -10. + h1ExtraBuckets := &histogram.Histogram{ + Count: 7, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: -10, Length: 1}, + {Offset: 9, Length: 2}, + }, + PositiveBuckets: []int64{0, 2, 1}, // Abs: 0, 2, 3 + } + h1Recoded := &histogram.Histogram{ + Count: 7, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{2, 1, -3, 0}, // Abs: 2, 3, 0, 0 + } // Appendable to h1. h2 := &histogram.Histogram{ Count: 12, @@ -179,6 +206,32 @@ func TestHistogramSeriesToChunks(t *testing.T) { }, PositiveBuckets: []float64{3, 1}, } + // fh1 but with an extra empty bucket at offset -10. + // This can happen if fh1 is from a recoded chunk, where a later histogram had a bucket at offset -10. + fh1ExtraBuckets := &histogram.FloatHistogram{ + Count: 6, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: -10, Length: 1}, + {Offset: 9, Length: 2}, + }, + PositiveBuckets: []float64{0, 3, 1}, + } + fh1Recoded := &histogram.FloatHistogram{ + Count: 6, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{3, 1, 0, 0}, + } // Appendable to fh1. fh2 := &histogram.FloatHistogram{ Count: 17, @@ -219,6 +272,20 @@ func TestHistogramSeriesToChunks(t *testing.T) { }, PositiveBuckets: []int64{2, 1}, // Abs: 2, 3 } + // gh1 recoded to add extra empty buckets at end. + gh1Recoded := &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Count: 7, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{2, 1, -3, 0}, // Abs: 2, 3, 0, 0 + } gh2 := &histogram.Histogram{ CounterResetHint: histogram.GaugeType, Count: 12, @@ -246,6 +313,20 @@ func TestHistogramSeriesToChunks(t *testing.T) { }, PositiveBuckets: []float64{3, 1}, } + // gfh1 recoded to add an extra empty buckets at end. + gfh1Recoded := &histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Count: 6, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{3, 1, 0, 0}, + } gfh2 := &histogram.FloatHistogram{ CounterResetHint: histogram.GaugeType, Count: 17, @@ -272,6 +353,9 @@ func TestHistogramSeriesToChunks(t *testing.T) { samples: []chunks.Sample{ hSample{t: 1, h: h1}, }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: h1}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset}, }, "two histograms encoded to a single chunk": { @@ -279,6 +363,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 1, h: h1}, hSample{t: 2, h: h2}, }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: h1Recoded}, + hSample{t: 2, h: h2}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset}, }, "two histograms encoded to two chunks": { @@ -286,6 +374,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 1, h: h2}, hSample{t: 2, h: h1}, }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: h2}, + hSample{t: 2, h: h1}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, }, "histogram and stale sample encoded to two chunks": { @@ -293,6 +385,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 1, h: staleHistogram}, hSample{t: 2, h: h1}, }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: staleHistogram}, + hSample{t: 2, h: h1}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, }, "histogram and reduction in bucket encoded to two chunks": { @@ -300,6 +396,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 1, h: h1}, hSample{t: 2, h: h2down}, }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: h1}, + hSample{t: 2, h: h2down}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, }, // Float histograms. @@ -307,6 +407,9 @@ func TestHistogramSeriesToChunks(t *testing.T) { samples: []chunks.Sample{ fhSample{t: 1, fh: fh1}, }, + expectedSamples: []chunks.Sample{ + fhSample{t: 1, fh: fh1}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset}, }, "two float histograms encoded to a single chunk": { @@ -314,6 +417,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 1, fh: fh1}, fhSample{t: 2, fh: fh2}, }, + expectedSamples: []chunks.Sample{ + fhSample{t: 1, fh: fh1Recoded}, + fhSample{t: 2, fh: fh2}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset}, }, "two float histograms encoded to two chunks": { @@ -321,6 +428,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 1, fh: fh2}, fhSample{t: 2, fh: fh1}, }, + expectedSamples: []chunks.Sample{ + fhSample{t: 1, fh: fh2}, + fhSample{t: 2, fh: fh1}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, }, "float histogram and stale sample encoded to two chunks": { @@ -328,6 +439,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 1, fh: staleFloatHistogram}, fhSample{t: 2, fh: fh1}, }, + expectedSamples: []chunks.Sample{ + fhSample{t: 1, fh: staleFloatHistogram}, + fhSample{t: 2, fh: fh1}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, }, "float histogram and reduction in bucket encoded to two chunks": { @@ -335,6 +450,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 1, fh: fh1}, fhSample{t: 2, fh: fh2down}, }, + expectedSamples: []chunks.Sample{ + fhSample{t: 1, fh: fh1}, + fhSample{t: 2, fh: fh2down}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, }, // Mixed. @@ -343,6 +462,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 1, h: h1}, fhSample{t: 2, fh: fh2}, }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: h1}, + fhSample{t: 2, fh: fh2}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, }, "float histogram and histogram encoded to two chunks": { @@ -350,6 +473,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 1, fh: fh1}, hSample{t: 2, h: h2}, }, + expectedSamples: []chunks.Sample{ + fhSample{t: 1, fh: fh1}, + hSample{t: 2, h: h2}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, }, "histogram and stale float histogram encoded to two chunks": { @@ -357,12 +484,19 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 1, h: h1}, fhSample{t: 2, fh: staleFloatHistogram}, }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: h1}, + fhSample{t: 2, fh: staleFloatHistogram}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, }, "single gauge histogram encoded to one chunk": { samples: []chunks.Sample{ hSample{t: 1, h: gh1}, }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: gh1}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, }, "two gauge histograms encoded to one chunk when counter increases": { @@ -370,6 +504,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 1, h: gh1}, hSample{t: 2, h: gh2}, }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: gh1Recoded}, + hSample{t: 2, h: gh2}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, }, "two gauge histograms encoded to one chunk when counter decreases": { @@ -377,12 +515,19 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 1, h: gh2}, hSample{t: 2, h: gh1}, }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: gh2}, + hSample{t: 2, h: gh1Recoded}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, }, "single gauge float histogram encoded to one chunk": { samples: []chunks.Sample{ fhSample{t: 1, fh: gfh1}, }, + expectedSamples: []chunks.Sample{ + fhSample{t: 1, fh: gfh1}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, }, "two float gauge histograms encoded to one chunk when counter increases": { @@ -390,6 +535,10 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 1, fh: gfh1}, fhSample{t: 2, fh: gfh2}, }, + expectedSamples: []chunks.Sample{ + fhSample{t: 1, fh: gfh1Recoded}, + fhSample{t: 2, fh: gfh2}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, }, "two float gauge histograms encoded to one chunk when counter decreases": { @@ -397,8 +546,34 @@ func TestHistogramSeriesToChunks(t *testing.T) { fhSample{t: 1, fh: gfh2}, fhSample{t: 2, fh: gfh1}, }, + expectedSamples: []chunks.Sample{ + fhSample{t: 1, fh: gfh2}, + fhSample{t: 2, fh: gfh1Recoded}, + }, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, }, + "histogram with extra empty bucket followed by histogram encodes to one chunk": { + samples: []chunks.Sample{ + hSample{t: 1, h: h1ExtraBuckets}, + hSample{t: 2, h: h1}, + }, + expectedSamples: []chunks.Sample{ + hSample{t: 1, h: h1ExtraBuckets}, + hSample{t: 2, h: h1ExtraBuckets}, // Recoded to add the missing buckets. + }, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset}, + }, + "float histogram with extra empty bucket followed by float histogram encodes to one chunk": { + samples: []chunks.Sample{ + fhSample{t: 1, fh: fh1ExtraBuckets}, + fhSample{t: 2, fh: fh1}, + }, + expectedSamples: []chunks.Sample{ + fhSample{t: 1, fh: fh1ExtraBuckets}, + fhSample{t: 2, fh: fh1ExtraBuckets}, // Recoded to add the missing buckets. + }, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset}, + }, } for testName, test := range tests { @@ -431,9 +606,9 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { // Decode all encoded samples and assert they are equal to the original ones. encodedSamples := chunks.ChunkMetasToSamples(chks) - require.Equal(t, len(test.samples), len(encodedSamples)) + require.Equal(t, len(test.expectedSamples), len(encodedSamples)) - for i, s := range test.samples { + for i, s := range test.expectedSamples { encodedSample := encodedSamples[i] switch expectedSample := s.(type) { case hSample: @@ -447,7 +622,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { require.True(t, value.IsStaleNaN(h.Sum), fmt.Sprintf("at idx %d", i)) continue } - require.Equal(t, *expectedSample.h, *h.Compact(0), fmt.Sprintf("at idx %d", i)) + require.Equal(t, *expectedSample.h, *h, fmt.Sprintf("at idx %d", i)) case fhSample: require.Equal(t, chunkenc.ValFloatHistogram, encodedSample.Type(), "expect float histogram", fmt.Sprintf("at idx %d", i)) fh := encodedSample.FH() @@ -459,7 +634,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { require.True(t, value.IsStaleNaN(fh.Sum), fmt.Sprintf("at idx %d", i)) continue } - require.Equal(t, *expectedSample.fh, *fh.Compact(0), fmt.Sprintf("at idx %d", i)) + require.Equal(t, *expectedSample.fh, *fh, fmt.Sprintf("at idx %d", i)) default: t.Error("internal error, unexpected type") } diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index a7c1fffb1e..cc35df5bae 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -219,16 +219,25 @@ func (a *FloatHistogramAppender) Append(int64, float64) { } // appendable returns whether the chunk can be appended to, and if so whether -// any recoding needs to happen using the provided inserts (in case of any new -// buckets, positive or negative range, respectively). If the sample is a gauge -// histogram, AppendableGauge must be used instead. +// 1. Any recoding needs to happen to the chunk using the provided forward +// inserts (in case of any new buckets, positive or negative range, +// respectively). +// 2. Any recoding needs to happen for the histogram being appended, using the +// backward inserts (in case of any missing buckets, positive or negative +// range, respectively). +// +// If the sample is a gauge histogram, AppendableGauge must be used instead. // // The chunk is not appendable in the following cases: +// // - The schema has changed. // - The custom bounds have changed if the current schema is custom buckets. // - The threshold for the zero bucket has changed. -// - Any buckets have disappeared. -// - There was a counter reset in the count of observations or in any bucket, including the zero bucket. +// - Any buckets have disappeared, unless the bucket count was 0, unused. +// Empty bucket can happen if the chunk was recoded and we're merging a non +// recoded histogram. In this case backward inserts will be provided. +// - There was a counter reset in the count of observations or in any bucket, +// including the zero bucket. // - The last sample in the chunk was stale while the current sample is not stale. // // The method returns an additional boolean set to true if it is not appendable @@ -236,6 +245,7 @@ func (a *FloatHistogramAppender) Append(int64, float64) { // append. If counterReset is true, okToAppend is always false. func (a *FloatHistogramAppender) appendable(h *histogram.FloatHistogram) ( positiveInserts, negativeInserts []Insert, + backwardPositiveInserts, backwardNegativeInserts []Insert, okToAppend, counterReset bool, ) { if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType { @@ -279,27 +289,214 @@ func (a *FloatHistogramAppender) appendable(h *histogram.FloatHistogram) ( } var ok bool - positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans) + positiveInserts, backwardPositiveInserts, ok = expandFloatSpansAndBuckets(a.pSpans, h.PositiveSpans, a.pBuckets, h.PositiveBuckets) if !ok { counterReset = true return } - negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans) + negativeInserts, backwardNegativeInserts, ok = expandFloatSpansAndBuckets(a.nSpans, h.NegativeSpans, a.nBuckets, h.NegativeBuckets) if !ok { counterReset = true return } - if counterResetInAnyFloatBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) || - counterResetInAnyFloatBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) { - counterReset, positiveInserts, negativeInserts = true, nil, nil - return - } - okToAppend = true return } +// expandFloatSpansAndBuckets returns the inserts to expand the bucket spans 'a' so that +// they match the spans in 'b'. 'b' must cover the same or more buckets than +// 'a', otherwise the function will return false. +// The function also returns the inserts to expand 'b' to also cover all the +// buckets that are missing in 'b', but are present with 0 counter value in 'a'. +// The function also checks for counter resets between 'a' and 'b'. +// +// Example: +// +// Let's say the old buckets look like this: +// +// span syntax: [offset, length] +// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1] +// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15] +// raw values 6 3 3 2 4 5 1 +// deltas 6 -3 0 -1 2 1 -4 +// +// But now we introduce a new bucket layout. (Carefully chosen example where we +// have a span appended, one unchanged[*], one prepended, and two merge - in +// that order.) +// +// [*] unchanged in terms of which bucket indices they represent. but to achieve +// that, their offset needs to change if "disrupted" by spans changing ahead of +// them +// +// \/ this one is "unchanged" +// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ] +// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15] +// raw values 6 3 0 3 0 0 2 4 5 0 1 +// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1 +// delta mods: / \ / \ / \ +// +// Note for histograms with delta-encoded buckets: Whenever any new buckets are +// introduced, the subsequent "old" bucket needs to readjust its delta to the +// new base of 0. Thus, for the caller who wants to transform the set of +// original deltas to a new set of deltas to match a new span layout that adds +// buckets, we simply need to generate a list of inserts. +// +// Note: Within expandSpansForward we don't have to worry about the changes to the +// spans themselves, thanks to the iterators we get to work with the more useful +// bucket indices (which of course directly correspond to the buckets we have to +// adjust). +func expandFloatSpansAndBuckets(a, b []histogram.Span, aBuckets []xorValue, bBuckets []float64) (forward, backward []Insert, ok bool) { + ai := newBucketIterator(a) + bi := newBucketIterator(b) + + var aInserts []Insert // To insert into buckets of a, to make up for missing buckets in b. + var bInserts []Insert // To insert into buckets of b, to make up for missing empty(!) buckets in a. + + // When aInter.num or bInter.num becomes > 0, this becomes a valid insert that should + // be yielded when we finish a streak of new buckets. + var aInter Insert + var bInter Insert + + aIdx, aOK := ai.Next() + bIdx, bOK := bi.Next() + + // Bucket count. Initialize the absolute count and index into the + // positive/negative counts or deltas array. The bucket count is + // used to detect counter reset as well as unused buckets in a. + var ( + aCount float64 + bCount float64 + aCountIdx int + bCountIdx int + ) + if aOK { + aCount = aBuckets[aCountIdx].value + } + if bOK { + bCount = bBuckets[bCountIdx] + } + +loop: + for { + switch { + case aOK && bOK: + switch { + case aIdx == bIdx: // Both have an identical bucket index. + // Bucket count. Check bucket for reset from a to b. + if aCount > bCount { + return nil, nil, false + } + + // Finish WIP insert for a and reset. + if aInter.num > 0 { + aInserts = append(aInserts, aInter) + aInter.num = 0 + } + + // Finish WIP insert for b and reset. + if bInter.num > 0 { + bInserts = append(bInserts, bInter) + bInter.num = 0 + } + + aIdx, aOK = ai.Next() + bIdx, bOK = bi.Next() + aInter.pos++ // Advance potential insert position. + aCountIdx++ // Advance absolute bucket count index for a. + if aOK { + aCount = aBuckets[aCountIdx].value + } + bInter.pos++ // Advance potential insert position. + bCountIdx++ // Advance absolute bucket count index for b. + if bOK { + bCount = bBuckets[bCountIdx] + } + + continue + case aIdx < bIdx: // b misses a bucket index that is in a. + // This is ok if the count in a is 0, in which case we make a note to + // fill in the bucket in b and advance a. + if aCount == 0 { + bInter.num++ // Mark that we need to insert a bucket in b. + // Advance a + if aInter.num > 0 { + aInserts = append(aInserts, aInter) + aInter.num = 0 + } + aIdx, aOK = ai.Next() + aInter.pos++ + aCountIdx++ + if aOK { + aCount = aBuckets[aCountIdx].value + } + continue + } + // Otherwise we are missing a bucket that was in use in a, which is a reset. + return nil, nil, false + case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare. + aInter.num++ + // Advance b + if bInter.num > 0 { + bInserts = append(bInserts, bInter) + bInter.num = 0 + } + bIdx, bOK = bi.Next() + bInter.pos++ + bCountIdx++ + if bOK { + bCount = bBuckets[bCountIdx] + } + } + case aOK && !bOK: // b misses a value that is in a. + // This is ok if the count in a is 0, in which case we make a note to + // fill in the bucket in b and advance a. + if aCount == 0 { + bInter.num++ + // Advance a + if aInter.num > 0 { + aInserts = append(aInserts, aInter) + aInter.num = 0 + } + aIdx, aOK = ai.Next() + aInter.pos++ // Advance potential insert position. + // Update absolute bucket counts for a. + aCountIdx++ + if aOK { + aCount = aBuckets[aCountIdx].value + } + continue + } + // Otherwise we are missing a bucket that was in use in a, which is a reset. + return nil, nil, false + case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. + aInter.num++ + // Advance b + if bInter.num > 0 { + bInserts = append(bInserts, bInter) + bInter.num = 0 + } + bIdx, bOK = bi.Next() + bInter.pos++ // Advance potential insert position. + // Update absolute bucket counts for b. + bCountIdx++ + if bOK { + bCount = bBuckets[bCountIdx] + } + default: // Both iterators ran out. We're done. + if aInter.num > 0 { + aInserts = append(aInserts, aInter) + } + if bInter.num > 0 { + bInserts = append(bInserts, bInter) + } + break loop + } + } + + return aInserts, bInserts, true +} + // appendableGauge returns whether the chunk can be appended to, and if so // whether: // 1. Any recoding needs to happen to the chunk using the provided inserts @@ -349,76 +546,6 @@ func (a *FloatHistogramAppender) appendableGauge(h *histogram.FloatHistogram) ( return } -// counterResetInAnyFloatBucket returns true if there was a counter reset for any -// bucket. This should be called only when the bucket layout is the same or new -// buckets were added. It does not handle the case of buckets missing. -func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, oldSpans, newSpans []histogram.Span) bool { - if len(oldSpans) == 0 || len(oldBuckets) == 0 { - return false - } - - var ( - oldSpanSliceIdx, newSpanSliceIdx int = -1, -1 // Index for the span slices. Starts at -1 to indicate that the first non empty span is not yet found. - oldInsideSpanIdx, newInsideSpanIdx uint32 // Index inside a span. - oldIdx, newIdx int32 // Index inside a bucket slice. - oldBucketSliceIdx, newBucketSliceIdx int // Index inside bucket slice. - ) - - // Find first non empty spans. - oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans) - newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans) - oldVal, newVal := oldBuckets[0].value, newBuckets[0] - - // Since we assume that new spans won't have missing buckets, there will never be a case - // where the old index will not find a matching new index. - for { - if oldIdx == newIdx { - if newVal < oldVal { - return true - } - } - - if oldIdx <= newIdx { - // Moving ahead old bucket and span by 1 index. - if oldInsideSpanIdx+1 >= oldSpans[oldSpanSliceIdx].Length { - // Current span is over. - oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans) - oldInsideSpanIdx = 0 - if oldSpanSliceIdx >= len(oldSpans) { - // All old spans are over. - break - } - } else { - oldInsideSpanIdx++ - oldIdx++ - } - oldBucketSliceIdx++ - oldVal = oldBuckets[oldBucketSliceIdx].value - } - - if oldIdx > newIdx { - // Moving ahead new bucket and span by 1 index. - if newInsideSpanIdx+1 >= newSpans[newSpanSliceIdx].Length { - // Current span is over. - newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans) - newInsideSpanIdx = 0 - if newSpanSliceIdx >= len(newSpans) { - // All new spans are over. - // This should not happen, old spans above should catch this first. - panic("new spans over before old spans in counterReset") - } - } else { - newInsideSpanIdx++ - newIdx++ - } - newBucketSliceIdx++ - newVal = newBuckets[newBucketSliceIdx] - } - } - - return false -} - // appendFloatHistogram appends a float histogram to the chunk. The caller must ensure that // the histogram is properly structured, e.g. the number of buckets used // corresponds to the number conveyed by the span structures. First call @@ -614,7 +741,7 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend a.setCounterResetHeader(CounterReset) case prev != nil: // This is a new chunk, but continued from a previous one. We need to calculate the reset header unless already set. - _, _, _, counterReset := prev.appendable(h) + _, _, _, _, _, counterReset := prev.appendable(h) if counterReset { a.setCounterResetHeader(CounterReset) } else { @@ -626,7 +753,7 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend // Adding counter-like histogram. if h.CounterResetHint != histogram.GaugeType { - pForwardInserts, nForwardInserts, okToAppend, counterReset := a.appendable(h) + pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterReset := a.appendable(h) if !okToAppend || counterReset { if appendOnly { if counterReset { @@ -657,6 +784,13 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend app.(*FloatHistogramAppender).appendFloatHistogram(t, h) return chk, true, app, nil } + if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 { + // The histogram needs to be expanded to have the extra empty buckets + // of the chunk. + h.PositiveSpans = a.pSpans + h.NegativeSpans = a.nSpans + a.recodeHistogram(h, pBackwardInserts, nBackwardInserts) + } a.appendFloatHistogram(t, h) return nil, false, a, nil } diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index 2ee4422b91..87bf61c2f4 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -245,9 +245,11 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) { h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3) // This is how span changes will be handled. hApp, _ := app.(*FloatHistogramAppender) - posInterjections, negInterjections, ok, cr := hApp.appendable(h2.ToFloat(nil)) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2.ToFloat(nil)) require.NotEmpty(t, posInterjections) require.NotEmpty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.True(t, ok) // Only new buckets came in. require.False(t, cr) c, app = hApp.recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans) @@ -333,7 +335,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { c, hApp, ts, h1 := setup(eh) h2 := h1.Copy() h2.Schema++ - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.False(t, ok) assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) @@ -343,7 +345,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { c, hApp, ts, h1 := setup(eh) h2 := h1.Copy() h2.ZeroThreshold += 0.1 - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.False(t, ok) assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) @@ -363,9 +365,11 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { h2.Sum = 30 h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1} - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.NotEmpty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.True(t, ok) // Only new buckets came in. require.False(t, cr) @@ -385,24 +389,56 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { h2.Sum = 21 h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1} - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.Empty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.False(t, ok) // Need to cut a new chunk. require.True(t, cr) assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset) } + { // New histogram that has buckets missing but the buckets missing were empty. + emptyBucketH := eh.Copy() + emptyBucketH.PositiveBuckets = []float64{6, 0, 3, 2, 4, 0, 1} + c, hApp, ts, h1 := setup(emptyBucketH) + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 1}, + {Offset: 3, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 5, Length: 1}, + } + h2.PositiveBuckets = []float64{7, 4, 3, 5, 2} + + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) + require.Empty(t, posInterjections) + require.Empty(t, negInterjections) + require.NotEmpty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) + require.True(t, ok) + require.False(t, cr) + + assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) + + // Check that h2 was recoded. + require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2}, h2.PositiveBuckets) + require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans) + } + { // New histogram that has a counter reset while buckets are same. c, hApp, ts, h1 := setup(eh) h2 := h1.Copy() h2.Sum = 23 h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1} - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.Empty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.False(t, ok) // Need to cut a new chunk. require.True(t, cr) @@ -421,9 +457,11 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { h2.Sum = 29 h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0} - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.Empty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.False(t, ok) // Need to cut a new chunk. require.True(t, cr) @@ -448,9 +486,11 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { h2.Sum = 26 h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1} - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.Empty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.False(t, ok) // Need to cut a new chunk. require.True(t, cr) @@ -524,10 +564,44 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { require.Equal(t, NotCounterReset, nextChunk.GetCounterResetHeader()) } + { + // Start a new chunk with a histogram that has an empty bucket. + // Add a histogram that has the same bucket missing. + // This should be appendable and can happen if we are merging from chunks + // where the first sample came from a recoded chunk that added the + // empty bucket. + h1 := eh.Copy() + // Add a bucket that is empty -10 offsets from the first bucket. + h1.PositiveSpans = make([]histogram.Span, len(eh.PositiveSpans)+1) + h1.PositiveSpans[0] = histogram.Span{Offset: eh.PositiveSpans[0].Offset - 10, Length: 1} + h1.PositiveSpans[1] = histogram.Span{Offset: eh.PositiveSpans[0].Offset + 9, Length: eh.PositiveSpans[0].Length} + for i, v := range eh.PositiveSpans[1:] { + h1.PositiveSpans[i+2] = v + } + h1.PositiveBuckets = make([]float64, len(eh.PositiveBuckets)+1) + h1.PositiveBuckets[0] = 0 + for i, v := range eh.PositiveBuckets { + h1.PositiveBuckets[i+1] = v + } + + c, hApp, ts, _ := setup(h1) + h2 := eh.Copy() + + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) + require.Empty(t, posInterjections) + require.Empty(t, negInterjections) + require.NotEmpty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) + require.True(t, ok) + require.False(t, cr) + + assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) + } + { // Custom buckets, no change. c, hApp, ts, h1 := setup(cbh) h2 := h1.Copy() - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.True(t, ok) assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) @@ -538,7 +612,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { h2 := h1.Copy() h2.Count++ h2.PositiveBuckets = []float64{6, 3, 3, 2, 4, 5, 2} - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.True(t, ok) assertNoNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) @@ -549,7 +623,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { h2 := h1.Copy() h2.Count-- h2.PositiveBuckets = []float64{6, 3, 3, 2, 4, 5, 0} - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.False(t, ok) assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset) @@ -559,7 +633,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { c, hApp, ts, h1 := setup(cbh) h2 := h1.Copy() h2.CustomValues = []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21} - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.False(t, ok) assertNewFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset) @@ -581,9 +655,11 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { // so the new histogram should have new counts >= these per-bucket counts, e.g.: h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1} // (total 30) - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.NotEmpty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.True(t, ok) // Only new buckets came in. require.False(t, cr) @@ -839,9 +915,11 @@ func TestFloatHistogramChunkAppendableWithEmptySpan(t *testing.T) { require.Equal(t, 1, c.NumSamples()) hApp, _ := app.(*FloatHistogramAppender) - pI, nI, okToAppend, counterReset := hApp.appendable(tc.h2) + pI, nI, bpI, bnI, okToAppend, counterReset := hApp.appendable(tc.h2) require.Empty(t, pI) require.Empty(t, nI) + require.Empty(t, bpI) + require.Empty(t, bnI) require.True(t, okToAppend) require.False(t, counterReset) }) diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index aa74badd10..a957d7b22d 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -237,16 +237,23 @@ func (a *HistogramAppender) Append(int64, float64) { } // appendable returns whether the chunk can be appended to, and if so whether -// any recoding needs to happen using the provided inserts (in case of any new -// buckets, positive or negative range, respectively). If the sample is a gauge -// histogram, AppendableGauge must be used instead. +// 1. Any recoding needs to happen to the chunk using the provided forward +// inserts (in case of any new buckets, positive or negative range, +// respectively). +// 2. Any recoding needs to happen for the histogram being appended, using the +// backward inserts (in case of any missing buckets, positive or negative +// range, respectively). +// +// If the sample is a gauge histogram, AppendableGauge must be used instead. // // The chunk is not appendable in the following cases: // // - The schema has changed. // - The custom bounds have changed if the current schema is custom buckets. // - The threshold for the zero bucket has changed. -// - Any buckets have disappeared. +// - Any buckets have disappeared, unless the bucket count was 0, unused. +// Empty bucket can happen if the chunk was recoded and we're merging a non +// recoded histogram. In this case backward inserts will be provided. // - There was a counter reset in the count of observations or in any bucket, // including the zero bucket. // - The last sample in the chunk was stale while the current sample is not stale. @@ -256,6 +263,7 @@ func (a *HistogramAppender) Append(int64, float64) { // append. If counterReset is true, okToAppend is always false. func (a *HistogramAppender) appendable(h *histogram.Histogram) ( positiveInserts, negativeInserts []Insert, + backwardPositiveInserts, backwardNegativeInserts []Insert, okToAppend, counterReset bool, ) { if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType { @@ -299,31 +307,219 @@ func (a *HistogramAppender) appendable(h *histogram.Histogram) ( } var ok bool - positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans) + positiveInserts, backwardPositiveInserts, ok = expandIntSpansAndBuckets(a.pSpans, h.PositiveSpans, a.pBuckets, h.PositiveBuckets) if !ok { counterReset = true return } - negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans) + negativeInserts, backwardNegativeInserts, ok = expandIntSpansAndBuckets(a.nSpans, h.NegativeSpans, a.nBuckets, h.NegativeBuckets) if !ok { counterReset = true return } - if counterResetInAnyBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) || - counterResetInAnyBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) { - counterReset, positiveInserts, negativeInserts = true, nil, nil - return - } - okToAppend = true return } +// expandIntSpansAndBuckets returns the inserts to expand the bucket spans 'a' so that +// they match the spans in 'b'. 'b' must cover the same or more buckets than +// 'a', otherwise the function will return false. +// The function also returns the inserts to expand 'b' to also cover all the +// buckets that are missing in 'b', but are present with 0 counter value in 'a'. +// The function also checks for counter resets between 'a' and 'b'. +// +// Example: +// +// Let's say the old buckets look like this: +// +// span syntax: [offset, length] +// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1] +// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15] +// raw values 6 3 3 2 4 5 1 +// deltas 6 -3 0 -1 2 1 -4 +// +// But now we introduce a new bucket layout. (Carefully chosen example where we +// have a span appended, one unchanged[*], one prepended, and two merge - in +// that order.) +// +// [*] unchanged in terms of which bucket indices they represent. but to achieve +// that, their offset needs to change if "disrupted" by spans changing ahead of +// them +// +// \/ this one is "unchanged" +// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ] +// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15] +// raw values 6 3 0 3 0 0 2 4 5 0 1 +// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1 +// delta mods: / \ / \ / \ +// +// Note for histograms with delta-encoded buckets: Whenever any new buckets are +// introduced, the subsequent "old" bucket needs to readjust its delta to the +// new base of 0. Thus, for the caller who wants to transform the set of +// original deltas to a new set of deltas to match a new span layout that adds +// buckets, we simply need to generate a list of inserts. +// +// Note: Within expandSpansForward we don't have to worry about the changes to the +// spans themselves, thanks to the iterators we get to work with the more useful +// bucket indices (which of course directly correspond to the buckets we have to +// adjust). +func expandIntSpansAndBuckets(a, b []histogram.Span, aBuckets, bBuckets []int64) (forward, backward []Insert, ok bool) { + ai := newBucketIterator(a) + bi := newBucketIterator(b) + + var aInserts []Insert // To insert into buckets of a, to make up for missing buckets in b. + var bInserts []Insert // To insert into buckets of b, to make up for missing empty(!) buckets in a. + + // When aInter.num or bInter.num becomes > 0, this becomes a valid insert that should + // be yielded when we finish a streak of new buckets. + var aInter Insert + var bInter Insert + + aIdx, aOK := ai.Next() + bIdx, bOK := bi.Next() + + // Bucket count. Initialize the absolute count and index into the + // positive/negative counts or deltas array. The bucket count is + // used to detect counter reset as well as unused buckets in a. + var ( + aCount int64 + bCount int64 + aCountIdx int + bCountIdx int + ) + if aOK { + aCount = aBuckets[aCountIdx] + } + if bOK { + bCount = bBuckets[bCountIdx] + } + +loop: + for { + switch { + case aOK && bOK: + switch { + case aIdx == bIdx: // Both have an identical bucket index. + // Bucket count. Check bucket for reset from a to b. + if aCount > bCount { + return nil, nil, false + } + + // Finish WIP insert for a and reset. + if aInter.num > 0 { + aInserts = append(aInserts, aInter) + aInter.num = 0 + } + + // Finish WIP insert for b and reset. + if bInter.num > 0 { + bInserts = append(bInserts, bInter) + bInter.num = 0 + } + + aIdx, aOK = ai.Next() + bIdx, bOK = bi.Next() + aInter.pos++ // Advance potential insert position. + aCountIdx++ // Advance absolute bucket count index for a. + if aOK { + aCount += aBuckets[aCountIdx] + } + bInter.pos++ // Advance potential insert position. + bCountIdx++ // Advance absolute bucket count index for b. + if bOK { + bCount += bBuckets[bCountIdx] + } + + continue + case aIdx < bIdx: // b misses a bucket index that is in a. + // This is ok if the count in a is 0, in which case we make a note to + // fill in the bucket in b and advance a. + if aCount == 0 { + bInter.num++ // Mark that we need to insert a bucket in b. + // Advance a + if aInter.num > 0 { + aInserts = append(aInserts, aInter) + aInter.num = 0 + } + aIdx, aOK = ai.Next() + aInter.pos++ + aCountIdx++ + if aOK { + aCount += aBuckets[aCountIdx] + } + continue + } + // Otherwise we are missing a bucket that was in use in a, which is a reset. + return nil, nil, false + case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare. + aInter.num++ + // Advance b + if bInter.num > 0 { + bInserts = append(bInserts, bInter) + bInter.num = 0 + } + bIdx, bOK = bi.Next() + bInter.pos++ + bCountIdx++ + if bOK { + bCount += bBuckets[bCountIdx] + } + } + case aOK && !bOK: // b misses a value that is in a. + // This is ok if the count in a is 0, in which case we make a note to + // fill in the bucket in b and advance a. + if aCount == 0 { + bInter.num++ + // Advance a + if aInter.num > 0 { + aInserts = append(aInserts, aInter) + aInter.num = 0 + } + aIdx, aOK = ai.Next() + aInter.pos++ // Advance potential insert position. + // Update absolute bucket counts for a. + aCountIdx++ + if aOK { + aCount += aBuckets[aCountIdx] + } + continue + } + // Otherwise we are missing a bucket that was in use in a, which is a reset. + return nil, nil, false + case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. + aInter.num++ + // Advance b + if bInter.num > 0 { + bInserts = append(bInserts, bInter) + bInter.num = 0 + } + bIdx, bOK = bi.Next() + bInter.pos++ // Advance potential insert position. + // Update absolute bucket counts for b. + bCountIdx++ + if bOK { + bCount += bBuckets[bCountIdx] + } + default: // Both iterators ran out. We're done. + if aInter.num > 0 { + aInserts = append(aInserts, aInter) + } + if bInter.num > 0 { + bInserts = append(bInserts, bInter) + } + break loop + } + } + + return aInserts, bInserts, true +} + // appendableGauge returns whether the chunk can be appended to, and if so // whether: -// 1. Any recoding needs to happen to the chunk using the provided inserts -// (in case of any new buckets, positive or negative range, respectively). +// 1. Any recoding needs to happen to the chunk using the provided forward +// inserts (in case of any new buckets, positive or negative range, +// respectively). // 2. Any recoding needs to happen for the histogram being appended, using the // backward inserts (in case of any missing buckets, positive or negative // range, respectively). @@ -369,76 +565,6 @@ func (a *HistogramAppender) appendableGauge(h *histogram.Histogram) ( return } -// counterResetInAnyBucket returns true if there was a counter reset for any -// bucket. This should be called only when the bucket layout is the same or new -// buckets were added. It does not handle the case of buckets missing. -func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans []histogram.Span) bool { - if len(oldSpans) == 0 || len(oldBuckets) == 0 { - return false - } - - var ( - oldSpanSliceIdx, newSpanSliceIdx int = -1, -1 // Index for the span slices. Starts at -1 to indicate that the first non empty span is not yet found. - oldInsideSpanIdx, newInsideSpanIdx uint32 // Index inside a span. - oldIdx, newIdx int32 // Index inside a bucket slice. - oldBucketSliceIdx, newBucketSliceIdx int // Index inside bucket slice. - ) - - // Find first non empty spans. - oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans) - newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans) - oldVal, newVal := oldBuckets[0], newBuckets[0] - - // Since we assume that new spans won't have missing buckets, there will never be a case - // where the old index will not find a matching new index. - for { - if oldIdx == newIdx { - if newVal < oldVal { - return true - } - } - - if oldIdx <= newIdx { - // Moving ahead old bucket and span by 1 index. - if oldInsideSpanIdx+1 >= oldSpans[oldSpanSliceIdx].Length { - // Current span is over. - oldSpanSliceIdx, oldIdx = nextNonEmptySpanSliceIdx(oldSpanSliceIdx, oldIdx, oldSpans) - oldInsideSpanIdx = 0 - if oldSpanSliceIdx >= len(oldSpans) { - // All old spans are over. - break - } - } else { - oldInsideSpanIdx++ - oldIdx++ - } - oldBucketSliceIdx++ - oldVal += oldBuckets[oldBucketSliceIdx] - } - - if oldIdx > newIdx { - // Moving ahead new bucket and span by 1 index. - if newInsideSpanIdx+1 >= newSpans[newSpanSliceIdx].Length { - // Current span is over. - newSpanSliceIdx, newIdx = nextNonEmptySpanSliceIdx(newSpanSliceIdx, newIdx, newSpans) - newInsideSpanIdx = 0 - if newSpanSliceIdx >= len(newSpans) { - // All new spans are over. - // This should not happen, old spans above should catch this first. - panic("new spans over before old spans in counterReset") - } - } else { - newInsideSpanIdx++ - newIdx++ - } - newBucketSliceIdx++ - newVal += newBuckets[newBucketSliceIdx] - } - } - - return false -} - // appendHistogram appends a histogram to the chunk. The caller must ensure that // the histogram is properly structured, e.g. the number of buckets used // corresponds to the number conveyed by the span structures. First call @@ -649,7 +775,7 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h a.setCounterResetHeader(CounterReset) case prev != nil: // This is a new chunk, but continued from a previous one. We need to calculate the reset header unless already set. - _, _, _, counterReset := prev.appendable(h) + _, _, _, _, _, counterReset := prev.appendable(h) if counterReset { a.setCounterResetHeader(CounterReset) } else { @@ -661,7 +787,7 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h // Adding counter-like histogram. if h.CounterResetHint != histogram.GaugeType { - pForwardInserts, nForwardInserts, okToAppend, counterReset := a.appendable(h) + pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterReset := a.appendable(h) if !okToAppend || counterReset { if appendOnly { if counterReset { @@ -692,6 +818,13 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h app.(*HistogramAppender).appendHistogram(t, h) return chk, true, app, nil } + if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 { + // The histogram needs to be expanded to have the extra empty buckets + // of the chunk. + h.PositiveSpans = a.pSpans + h.NegativeSpans = a.nSpans + a.recodeHistogram(h, pBackwardInserts, nBackwardInserts) + } a.appendHistogram(t, h) return nil, false, a, nil } diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index c5381ba2fb..59e2e10fc4 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -280,6 +280,9 @@ type Insert struct { num int } +// Deprecated: expandSpansForward, use expandIntSpansAndBuckets or +// expandFloatSpansAndBuckets instead. +// expandSpansForward is left here for reference. // expandSpansForward returns the inserts to expand the bucket spans 'a' so that // they match the spans in 'b'. 'b' must cover the same or more buckets than // 'a', otherwise the function will return false. @@ -574,15 +577,3 @@ func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterR return histogram.UnknownCounterReset } } - -// Handle pathological case of empty span when advancing span idx. -// Call it with idx==-1 to find the first non empty span. -func nextNonEmptySpanSliceIdx(idx int, bucketIdx int32, spans []histogram.Span) (newIdx int, newBucketIdx int32) { - for idx++; idx < len(spans); idx++ { - if spans[idx].Length > 0 { - return idx, bucketIdx + spans[idx].Offset + 1 - } - bucketIdx += spans[idx].Offset - } - return idx, 0 -} diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index d029aaefcb..939edd4403 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -256,9 +256,11 @@ func TestHistogramChunkBucketChanges(t *testing.T) { h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3) // This is how span changes will be handled. hApp, _ := app.(*HistogramAppender) - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.NotEmpty(t, posInterjections) require.NotEmpty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.True(t, ok) // Only new buckets came in. require.False(t, cr) c, app = hApp.recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans) @@ -347,7 +349,7 @@ func TestHistogramChunkAppendable(t *testing.T) { c, hApp, ts, h1 := setup(eh) h2 := h1.Copy() h2.Schema++ - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.False(t, ok) assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) @@ -357,7 +359,7 @@ func TestHistogramChunkAppendable(t *testing.T) { c, hApp, ts, h1 := setup(eh) h2 := h1.Copy() h2.ZeroThreshold += 0.1 - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.False(t, ok) assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) @@ -380,9 +382,11 @@ func TestHistogramChunkAppendable(t *testing.T) { // so the new histogram should have new counts >= these per-bucket counts, e.g.: h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30) - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.NotEmpty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.True(t, ok) // Only new buckets came in. require.False(t, cr) @@ -401,24 +405,57 @@ func TestHistogramChunkAppendable(t *testing.T) { h2.Sum = 21 h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21) - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.Empty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.False(t, ok) // Need to cut a new chunk. require.True(t, cr) assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset) } + { // New histogram that has buckets missing but the buckets missing were empty. + emptyBucketH := eh.Copy() + emptyBucketH.PositiveBuckets = []int64{6, -6, 1, 1, -2, 1, 1} // counts: 6, 0, 1, 2, 0, 1, 2 (total 12) + c, hApp, ts, h1 := setup(emptyBucketH) + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ // Missing buckets at offset 1 and 9. + {Offset: 0, Length: 1}, + {Offset: 3, Length: 1}, + {Offset: 3, Length: 1}, + {Offset: 4, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.PositiveBuckets = []int64{7, -5, 1, 0, 1} // counts: 7, 2, 3, 3, 4 (total 18) + + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) + require.Empty(t, posInterjections) + require.Empty(t, negInterjections) + require.NotEmpty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) + require.True(t, ok) + require.False(t, cr) + + assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) + + // Check that h2 was recoded. + require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 4 (total 18) + require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans) + } + { // New histogram that has a counter reset while buckets are same. c, hApp, ts, h1 := setup(eh) h2 := h1.Copy() h2.Sum = 23 h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23) - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.Empty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.False(t, ok) // Need to cut a new chunk. require.True(t, cr) @@ -440,9 +477,11 @@ func TestHistogramChunkAppendable(t *testing.T) { // so the new histogram should have new counts >= these per-bucket counts, e.g.: h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29) - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.Empty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.False(t, ok) // Need to cut a new chunk. require.True(t, cr) @@ -470,9 +509,11 @@ func TestHistogramChunkAppendable(t *testing.T) { // so the new histogram should have new counts >= these per-bucket counts, e.g.: h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26) - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.Empty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.False(t, ok) // Need to cut a new chunk. require.True(t, cr) @@ -549,10 +590,44 @@ func TestHistogramChunkAppendable(t *testing.T) { require.Equal(t, NotCounterReset, nextChunk.GetCounterResetHeader()) } + { + // Start a new chunk with a histogram that has an empty bucket. + // Add a histogram that has the same bucket missing. + // This should be appendable and can happen if we are merging from chunks + // where the first sample came from a recoded chunk that added the + // empty bucket. + h1 := eh.Copy() + // Add a bucket that is empty -10 offsets from the first bucket. + h1.PositiveSpans = make([]histogram.Span, len(eh.PositiveSpans)+1) + h1.PositiveSpans[0] = histogram.Span{Offset: eh.PositiveSpans[0].Offset - 10, Length: 1} + h1.PositiveSpans[1] = histogram.Span{Offset: eh.PositiveSpans[0].Offset + 9, Length: eh.PositiveSpans[0].Length} + for i, v := range eh.PositiveSpans[1:] { + h1.PositiveSpans[i+2] = v + } + h1.PositiveBuckets = make([]int64, len(eh.PositiveBuckets)+1) + h1.PositiveBuckets[0] = 0 + for i, v := range eh.PositiveBuckets { + h1.PositiveBuckets[i+1] = v + } + + c, hApp, ts, _ := setup(h1) + h2 := eh.Copy() + + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) + require.Empty(t, posInterjections) + require.Empty(t, negInterjections) + require.NotEmpty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) + require.True(t, ok) + require.False(t, cr) + + assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) + } + { // Custom buckets, no change. c, hApp, ts, h1 := setup(cbh) h2 := h1.Copy() - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.True(t, ok) assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) @@ -563,7 +638,7 @@ func TestHistogramChunkAppendable(t *testing.T) { h2 := h1.Copy() h2.Count++ h2.PositiveBuckets = []int64{6, -3, 0, -1, 2, 1, -3} - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.True(t, ok) assertNoNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset) @@ -574,7 +649,7 @@ func TestHistogramChunkAppendable(t *testing.T) { h2 := h1.Copy() h2.Count-- h2.PositiveBuckets = []int64{6, -3, 0, -1, 2, 1, -5} - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.False(t, ok) assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset) @@ -584,7 +659,7 @@ func TestHistogramChunkAppendable(t *testing.T) { c, hApp, ts, h1 := setup(cbh) h2 := h1.Copy() h2.CustomValues = []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21} - _, _, ok, _ := hApp.appendable(h2) + _, _, _, _, ok, _ := hApp.appendable(h2) require.False(t, ok) assertNewHistogramChunkOnAppend(t, c, hApp, ts+1, h2, CounterReset) @@ -606,9 +681,11 @@ func TestHistogramChunkAppendable(t *testing.T) { // so the new histogram should have new counts >= these per-bucket counts, e.g.: h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30) - posInterjections, negInterjections, ok, cr := hApp.appendable(h2) + posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) require.NotEmpty(t, posInterjections) require.Empty(t, negInterjections) + require.Empty(t, backwardPositiveInserts) + require.Empty(t, backwardNegativeInserts) require.True(t, ok) // Only new buckets came in. require.False(t, cr) @@ -875,9 +952,11 @@ func TestHistogramChunkAppendableWithEmptySpan(t *testing.T) { require.Equal(t, 1, c.NumSamples()) hApp, _ := app.(*HistogramAppender) - pI, nI, okToAppend, counterReset := hApp.appendable(tc.h2) + pI, nI, bpI, bnI, okToAppend, counterReset := hApp.appendable(tc.h2) require.Empty(t, pI) require.Empty(t, nI) + require.Empty(t, bpI) + require.Empty(t, bnI) require.True(t, okToAppend) require.False(t, counterReset) }) @@ -1368,3 +1447,50 @@ func TestHistogramAppendOnlyErrors(t *testing.T) { require.EqualError(t, err, "histogram counter reset") }) } + +func BenchmarkAppendable(b *testing.B) { + // Create a histogram with a bunch of spans and buckets. + const ( + numSpans = 1000 + spanLength = 10 + ) + h := &histogram.Histogram{ + Schema: 0, + Count: 100, + Sum: 1000, + ZeroThreshold: 0.001, + ZeroCount: 5, + } + for i := 0; i < numSpans; i++ { + h.PositiveSpans = append(h.PositiveSpans, histogram.Span{Offset: 5, Length: spanLength}) + h.NegativeSpans = append(h.NegativeSpans, histogram.Span{Offset: 5, Length: spanLength}) + for j := 0; j < spanLength; j++ { + h.PositiveBuckets = append(h.PositiveBuckets, int64(j)) + h.NegativeBuckets = append(h.NegativeBuckets, int64(j)) + } + } + + c := Chunk(NewHistogramChunk()) + + // Create fresh appender and add the first histogram. + app, err := c.Appender() + if err != nil { + b.Fatal(err) + } + + _, _, _, err = app.AppendHistogram(nil, 1, h, true) + if err != nil { + b.Fatal(err) + } + + hApp := app.(*HistogramAppender) + + isAppendable := true + for i := 0; i < b.N; i++ { + _, _, _, _, ok, _ := hApp.appendable(h) + isAppendable = isAppendable && ok + } + if !isAppendable { + b.Fail() + } +}