diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 84d8a2912..256679a8c 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -27,6 +27,8 @@ import ( // used to represent a histogram with integer counts and thus serves as a more // generalized representation. type FloatHistogram struct { + // Counter reset information. + CounterResetHint CounterResetHint // Currently valid schema numbers are -4 <= n <= 8. They are all for // base-2 bucket schemas, where 1 is a bucket boundary in each case, and // then each power of two is divided into 2^n logarithmic buckets. Or diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index 4f63cc17d..6d425307c 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -19,6 +19,17 @@ import ( "strings" ) +// CounterResetHint contains the known information about a counter reset, +// or alternatively that we are dealing with a gauge histogram, where counter resets do not apply. +type CounterResetHint byte + +const ( + UnknownCounterReset CounterResetHint = iota // UnknownCounterReset means we cannot say if this histogram signals a counter reset or not. + CounterReset // CounterReset means there was definitely a counter reset starting from this histogram. + NotCounterReset // NotCounterReset means there was definitely no counter reset with this histogram. + GaugeType // GaugeType means this is a gauge histogram, where counter resets do not happen. +) + // Histogram encodes a sparse, high-resolution histogram. See the design // document for full details: // https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit# @@ -35,6 +46,8 @@ import ( // // Which bucket indices are actually used is determined by the spans. type Histogram struct { + // Counter reset information. + CounterResetHint CounterResetHint // Currently valid schema numbers are -4 <= n <= 8. They are all for // base-2 bucket schemas, where 1 is a bucket boundary in each case, and // then each power of two is divided into 2^n logarithmic buckets. Or @@ -295,15 +308,16 @@ func (h *Histogram) ToFloat() *FloatHistogram { } return &FloatHistogram{ - Schema: h.Schema, - ZeroThreshold: h.ZeroThreshold, - ZeroCount: float64(h.ZeroCount), - Count: float64(h.Count), - Sum: h.Sum, - PositiveSpans: positiveSpans, - NegativeSpans: negativeSpans, - PositiveBuckets: positiveBuckets, - NegativeBuckets: negativeBuckets, + CounterResetHint: h.CounterResetHint, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: float64(h.ZeroCount), + Count: float64(h.Count), + Sum: h.Sum, + PositiveSpans: positiveSpans, + NegativeSpans: negativeSpans, + PositiveBuckets: positiveBuckets, + NegativeBuckets: negativeBuckets, } } diff --git a/promql/engine_test.go b/promql/engine_test.go index 28db8af19..0582fd5a4 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -3158,10 +3158,12 @@ func TestSparseHistogramRate(t *testing.T) { Schema: 1, ZeroThreshold: 0.001, ZeroCount: 1. / 15., - Count: 4. / 15., + Count: 8. / 15., Sum: 1.226666666666667, PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, } require.Equal(t, expectedHistogram, actualHistogram) } @@ -3199,10 +3201,12 @@ func TestSparseFloatHistogramRate(t *testing.T) { Schema: 1, ZeroThreshold: 0.001, ZeroCount: 1. / 15., - Count: 4. / 15., + Count: 8. / 15., Sum: 1.226666666666667, PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, } require.Equal(t, expectedHistogram, actualHistogram) } diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 142dc4203..538af364a 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -174,6 +174,7 @@ func newFloatHistogramIterator(b []byte) *floatHistogramIterator { // The first 3 bytes contain chunk headers. // We skip that for actual samples. _, _ = it.br.readBits(24) + it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000) return it } @@ -196,6 +197,14 @@ type FloatHistogramAppender struct { pBuckets, nBuckets []xorValue } +func (a *FloatHistogramAppender) GetCounterResetHeader() CounterResetHeader { + return CounterResetHeader(a.b.bytes()[2] & 0b11000000) +} + +func (a *FloatHistogramAppender) NumSamples() int { + return int(binary.BigEndian.Uint16(a.b.bytes())) +} + // Append implements Appender. This implementation panics because normal float // samples must never be appended to a histogram chunk. func (a *FloatHistogramAppender) Append(int64, float64) { @@ -211,19 +220,14 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) { // Appendable returns whether the chunk can be appended to, and if so // whether any recoding needs to happen using the provided interjections // (in case of any new buckets, positive or negative range, respectively). +// If the sample is a gauge histogram, AppendableGauge must be used instead. // // The chunk is not appendable in the following cases: -// -// • The schema has changed. -// -// • The threshold for the zero bucket has changed. -// -// • Any buckets have disappeared. -// -// • There was a counter reset in the count of observations or in any bucket, -// including the zero bucket. -// -// • The last sample in the chunk was stale while the current sample is not stale. +// - The schema has changed. +// - The threshold for the zero bucket has changed. +// - Any buckets have disappeared. +// - There was a counter reset in the count of observations or in any bucket, including the zero bucket. +// - The last sample in the chunk was stale while the current sample is not stale. // // The method returns an additional boolean set to true if it is not appendable // because of a counter reset. If the given sample is stale, it is always ok to @@ -232,6 +236,9 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( positiveInterjections, negativeInterjections []Interjection, okToAppend, counterReset bool, ) { + if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType { + return + } if value.IsStaleNaN(h.Sum) { // This is a stale sample whose buckets and spans don't matter. okToAppend = true @@ -260,12 +267,12 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( } var ok bool - positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans) + positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans) if !ok { counterReset = true return } - negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans) + negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans) if !ok { counterReset = true return @@ -281,6 +288,49 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( return } +// AppendableGauge returns whether the chunk can be appended to, and if so +// whether: +// 1. Any recoding needs to happen to the chunk using the provided interjections +// (in case of any new buckets, positive or negative range, respectively). +// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections +// (in case of any missing buckets, positive or negative range, respectively). +// +// This method must be only used for gauge histograms. +// +// The chunk is not appendable in the following cases: +// - The schema has changed. +// - The threshold for the zero bucket has changed. +// - The last sample in the chunk was stale while the current sample is not stale. +func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) ( + positiveInterjections, negativeInterjections []Interjection, + backwardPositiveInterjections, backwardNegativeInterjections []Interjection, + positiveSpans, negativeSpans []histogram.Span, + okToAppend bool, +) { + if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType { + return + } + if value.IsStaleNaN(h.Sum) { + // This is a stale sample whose buckets and spans don't matter. + okToAppend = true + return + } + if value.IsStaleNaN(a.sum.value) { + // If the last sample was stale, then we can only accept stale + // samples in this chunk. + return + } + + if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold { + return + } + + positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans) + negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans) + okToAppend = true + return +} + // counterResetInAnyFloatBucket returns true if there was a counter reset for any // bucket. This should be called only when the bucket layout is the same or new // buckets were added. It does not handle the case of buckets missing. @@ -502,11 +552,29 @@ func (a *FloatHistogramAppender) Recode( return hc, app } +// RecodeHistogramm converts the current histogram (in-place) to accommodate an expansion of the set of +// (positive and/or negative) buckets used. +func (a *FloatHistogramAppender) RecodeHistogramm( + fh *histogram.FloatHistogram, + pBackwardInter, nBackwardInter []Interjection, +) { + if len(pBackwardInter) > 0 { + numPositiveBuckets := countSpans(fh.PositiveSpans) + fh.PositiveBuckets = interject(fh.PositiveBuckets, make([]float64, numPositiveBuckets), pBackwardInter, false) + } + if len(nBackwardInter) > 0 { + numNegativeBuckets := countSpans(fh.NegativeSpans) + fh.NegativeBuckets = interject(fh.NegativeBuckets, make([]float64, numNegativeBuckets), nBackwardInter, false) + } +} + type floatHistogramIterator struct { br bstreamReader numTotal uint16 numRead uint16 + counterResetHeader CounterResetHeader + // Layout: schema int32 zThreshold float64 @@ -559,16 +627,21 @@ func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHis return it.t, &histogram.FloatHistogram{Sum: it.sum.value} } it.atFloatHistogramCalled = true + crHint := histogram.UnknownCounterReset + if it.counterResetHeader == GaugeType { + crHint = histogram.GaugeType + } return it.t, &histogram.FloatHistogram{ - Count: it.cnt.value, - ZeroCount: it.zCnt.value, - Sum: it.sum.value, - ZeroThreshold: it.zThreshold, - Schema: it.schema, - PositiveSpans: it.pSpans, - NegativeSpans: it.nSpans, - PositiveBuckets: it.pBuckets, - NegativeBuckets: it.nBuckets, + CounterResetHint: crHint, + Count: it.cnt.value, + ZeroCount: it.zCnt.value, + Sum: it.sum.value, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pBuckets, + NegativeBuckets: it.nBuckets, } } @@ -587,6 +660,8 @@ func (it *floatHistogramIterator) Reset(b []byte) { it.numTotal = binary.BigEndian.Uint16(b) it.numRead = 0 + it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000) + it.t, it.tDelta = 0, 0 it.cnt, it.zCnt, it.sum = xorValue{}, xorValue{}, xorValue{} diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index ed9af9c60..b080fe676 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -358,3 +358,171 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { require.True(t, cr) } } + +func TestFloatHistogramChunkAppendableGauge(t *testing.T) { + c := Chunk(NewFloatHistogramChunk()) + + // Create fresh appender and add the first histogram. + app, err := c.Appender() + require.NoError(t, err) + require.Equal(t, 0, c.NumSamples()) + + ts := int64(1234567890) + h1 := &histogram.FloatHistogram{ + Count: 5, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveBuckets: []float64{6, 3, 3, 2, 4, 5, 1}, + } + + app.AppendFloatHistogram(ts, h1.Copy()) + require.Equal(t, 1, c.NumSamples()) + c.(*FloatHistogramChunk).SetCounterResetHeader(GaugeType) + + { // Schema change. + h2 := h1.Copy() + h2.Schema++ + hApp, _ := app.(*FloatHistogramAppender) + _, _, _, _, _, _, ok := hApp.AppendableGauge(h2) + require.False(t, ok) + } + + { // Zero threshold change. + h2 := h1.Copy() + h2.ZeroThreshold += 0.1 + hApp, _ := app.(*FloatHistogramAppender) + _, _, _, _, _, _, ok := hApp.AppendableGauge(h2) + require.False(t, ok) + } + + { // New histogram that has more buckets. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Count += 9 + h2.ZeroCount++ + h2.Sum = 30 + h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has buckets missing. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 1}, + {Offset: 4, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Count -= 4 + h2.Sum-- + h2.PositiveBuckets = []float64{6, 3, 3, 2, 5, 1} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Len(t, pI, 0) + require.Len(t, nI, 0) + require.Greater(t, len(pBackwardI), 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has a bucket missing and new buckets. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 5, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Sum = 21 + h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Greater(t, len(pBackwardI), 0) + require.Len(t, nI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has a counter reset while buckets are same. + h2 := h1.Copy() + h2.Sum = 23 + h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Len(t, pI, 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has a counter reset while new buckets were added. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Sum = 29 + h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { + // New histogram that has a counter reset while new buckets were + // added before the first bucket and reset on first bucket. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: -3, Length: 2}, + {Offset: 1, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Sum = 26 + h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } +} diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index c633c1420..9b26e5472 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -286,12 +286,12 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( } var ok bool - positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans) + positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans) if !ok { counterReset = true return } - negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans) + negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans) if !ok { counterReset = true return diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index 34768afb2..f923dcb91 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -165,21 +165,23 @@ func (b *bucketIterator) Next() (int, bool) { if b.span >= len(b.spans) { return 0, false } -try: - if b.bucket < int(b.spans[b.span].Length-1) { // Try to move within same span. + if b.bucket < int(b.spans[b.span].Length)-1 { // Try to move within same span. b.bucket++ b.idx++ return b.idx, true - } else if b.span < len(b.spans)-1 { // Try to move from one span to the next. + } + + for b.span < len(b.spans)-1 { // Try to move from one span to the next. b.span++ b.idx += int(b.spans[b.span].Offset + 1) b.bucket = 0 if b.spans[b.span].Length == 0 { - // Pathological case that should never happen. We can't use this span, let's try again. - goto try + b.idx-- + continue } return b.idx, true } + // We're out of options. return 0, false } @@ -191,7 +193,7 @@ type Interjection struct { num int } -// compareSpans returns the interjections to convert a slice of deltas to a new +// forwardCompareSpans returns the interjections to convert a slice of deltas to a new // slice representing an expanded set of buckets, or false if incompatible // (e.g. if buckets were removed). // @@ -226,11 +228,11 @@ type Interjection struct { // match a new span layout that adds buckets, we simply need to generate a list // of interjections. // -// Note: Within compareSpans we don't have to worry about the changes to the +// Note: Within forwardCompareSpans we don't have to worry about the changes to the // spans themselves, thanks to the iterators we get to work with the more useful // bucket indices (which of course directly correspond to the buckets we have to // adjust). -func compareSpans(a, b []histogram.Span) ([]Interjection, bool) { +func forwardCompareSpans(a, b []histogram.Span) (forward []Interjection, ok bool) { ai := newBucketIterator(a) bi := newBucketIterator(b) @@ -278,6 +280,102 @@ loop: return interjections, true } +// bidirectionalCompareSpans does everything that forwardCompareSpans does and +// also returns interjections in the other direction (i.e. buckets missing in b that are missing in a). +func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Interjection, mergedSpans []histogram.Span) { + ai := newBucketIterator(a) + bi := newBucketIterator(b) + + var interjections, bInterjections []Interjection + var lastBucket int + addBucket := func(b int) { + offset := b - lastBucket - 1 + if offset == 0 && len(mergedSpans) > 0 { + mergedSpans[len(mergedSpans)-1].Length++ + } else { + if len(mergedSpans) == 0 { + offset++ + } + mergedSpans = append(mergedSpans, histogram.Span{ + Offset: int32(offset), + Length: 1, + }) + } + + lastBucket = b + } + + // When inter.num becomes > 0, this becomes a valid interjection that + // should be yielded when we finish a streak of new buckets. + var inter, bInter Interjection + + av, aOK := ai.Next() + bv, bOK := bi.Next() +loop: + for { + switch { + case aOK && bOK: + switch { + case av == bv: // Both have an identical value. move on! + // Finish WIP interjection and reset. + if inter.num > 0 { + interjections = append(interjections, inter) + inter.num = 0 + } + if bInter.num > 0 { + bInterjections = append(bInterjections, bInter) + bInter.num = 0 + } + addBucket(av) + av, aOK = ai.Next() + bv, bOK = bi.Next() + inter.pos++ + bInter.pos++ + case av < bv: // b misses a value that is in a. + bInter.num++ + // Collect the forward interjection before advancing the + // position of 'a'. + if inter.num > 0 { + interjections = append(interjections, inter) + inter.num = 0 + } + addBucket(av) + inter.pos++ + av, aOK = ai.Next() + case av > bv: // a misses a value that is in b. Forward b and recompare. + inter.num++ + // Collect the backward interjection before advancing the + // position of 'b'. + if bInter.num > 0 { + bInterjections = append(bInterjections, bInter) + bInter.num = 0 + } + addBucket(bv) + bInter.pos++ + bv, bOK = bi.Next() + } + case aOK && !bOK: // b misses a value that is in a. + bInter.num++ + addBucket(av) + av, aOK = ai.Next() + case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. + inter.num++ + addBucket(bv) + bv, bOK = bi.Next() + default: // Both iterators ran out. We're done. + if inter.num > 0 { + interjections = append(interjections, inter) + } + if bInter.num > 0 { + bInterjections = append(bInterjections, bInter) + } + break loop + } + } + + return interjections, bInterjections, mergedSpans +} + // interject merges 'in' with the provided interjections and writes them into // 'out', which must already have the appropriate length. func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV { diff --git a/tsdb/chunkenc/histogram_meta_test.go b/tsdb/chunkenc/histogram_meta_test.go index 30d2eef3a..a4ce62f3b 100644 --- a/tsdb/chunkenc/histogram_meta_test.go +++ b/tsdb/chunkenc/histogram_meta_test.go @@ -111,13 +111,12 @@ func TestBucketIterator(t *testing.T) { } } -func TestInterjection(t *testing.T) { +func TestCompareSpansAndInterject(t *testing.T) { scenarios := []struct { - description string - spansA, spansB []histogram.Span - valid bool - interjections []Interjection - bucketsIn, bucketsOut []int64 + description string + spansA, spansB []histogram.Span + interjections, backwardInterjections []Interjection + bucketsIn, bucketsOut []int64 }{ { description: "single prepend at the beginning", @@ -127,7 +126,6 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -11, Length: 4}, }, - valid: true, interjections: []Interjection{ { pos: 0, @@ -145,7 +143,6 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 4}, }, - valid: true, interjections: []Interjection{ { pos: 3, @@ -163,7 +160,6 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -12, Length: 5}, }, - valid: true, interjections: []Interjection{ { pos: 0, @@ -181,7 +177,6 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 5}, }, - valid: true, interjections: []Interjection{ { pos: 3, @@ -199,7 +194,6 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -12, Length: 7}, }, - valid: true, interjections: []Interjection{ { pos: 0, @@ -221,7 +215,9 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -9, Length: 3}, }, - valid: false, + backwardInterjections: []Interjection{ + {pos: 0, num: 1}, + }, }, { description: "single removal of bucket in the middle", @@ -232,7 +228,9 @@ func TestInterjection(t *testing.T) { {Offset: -10, Length: 2}, {Offset: 1, Length: 1}, }, - valid: false, + backwardInterjections: []Interjection{ + {pos: 2, num: 1}, + }, }, { description: "single removal of bucket at the end", @@ -242,7 +240,9 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 3}, }, - valid: false, + backwardInterjections: []Interjection{ + {pos: 3, num: 1}, + }, }, { description: "as described in doc comment", @@ -259,7 +259,6 @@ func TestInterjection(t *testing.T) { {Offset: 1, Length: 4}, {Offset: 3, Length: 3}, }, - valid: true, interjections: []Interjection{ { pos: 2, @@ -277,12 +276,67 @@ func TestInterjection(t *testing.T) { bucketsIn: []int64{6, -3, 0, -1, 2, 1, -4}, bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}, }, + { + description: "both forward and backward interjections, complex case", + spansA: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + spansB: []histogram.Span{ + {Offset: 1, Length: 2}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 2}, + {Offset: 1, Length: 1}, + {Offset: 4, Length: 1}, + }, + interjections: []Interjection{ + { + pos: 2, + num: 1, + }, + { + pos: 3, + num: 2, + }, + { + pos: 6, + num: 1, + }, + }, + backwardInterjections: []Interjection{ + { + pos: 0, + num: 1, + }, + { + pos: 5, + num: 1, + }, + { + pos: 6, + num: 1, + }, + { + pos: 7, + num: 1, + }, + }, + }, } for _, s := range scenarios { t.Run(s.description, func(t *testing.T) { - interjections, valid := compareSpans(s.spansA, s.spansB) - if !s.valid { + if len(s.backwardInterjections) > 0 { + interjections, bInterjections, _ := bidirectionalCompareSpans(s.spansA, s.spansB) + require.Equal(t, s.interjections, interjections) + require.Equal(t, s.backwardInterjections, bInterjections) + } + + interjections, valid := forwardCompareSpans(s.spansA, s.spansB) + if len(s.backwardInterjections) > 0 { require.False(t, valid, "compareScan unexpectedly returned true") return } @@ -292,6 +346,24 @@ func TestInterjection(t *testing.T) { gotBuckets := make([]int64, len(s.bucketsOut)) interject(s.bucketsIn, gotBuckets, interjections, true) require.Equal(t, s.bucketsOut, gotBuckets) + + floatBucketsIn := make([]float64, len(s.bucketsIn)) + last := s.bucketsIn[0] + floatBucketsIn[0] = float64(last) + for i := 1; i < len(floatBucketsIn); i++ { + last += s.bucketsIn[i] + floatBucketsIn[i] = float64(last) + } + floatBucketsOut := make([]float64, len(s.bucketsOut)) + last = s.bucketsOut[0] + floatBucketsOut[0] = float64(last) + for i := 1; i < len(floatBucketsOut); i++ { + last += s.bucketsOut[i] + floatBucketsOut[i] = float64(last) + } + gotFloatBuckets := make([]float64, len(floatBucketsOut)) + interject(floatBucketsIn, gotFloatBuckets, interjections, false) + require.Equal(t, floatBucketsOut, gotFloatBuckets) }) } } @@ -369,3 +441,135 @@ func TestWriteReadHistogramChunkLayout(t *testing.T) { require.Equal(t, want.negativeSpans, gotNegativeSpans) } } + +func TestSpansFromBidirectionalCompareSpans(t *testing.T) { + cases := []struct { + s1, s2, exp []histogram.Span + }{ + { // All empty. + s1: []histogram.Span{}, + s2: []histogram.Span{}, + }, + { // Same spans. + s1: []histogram.Span{}, + s2: []histogram.Span{}, + }, + { + // Has the cases of + // 1. |----| (partial overlap) + // |----| + // + // 2. |-----| (no gap but no overlap as well) + // |---| + // + // 3. |----| (complete overlap) + // |----| + s1: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 3, Length: 3}, + {Offset: 5, Length: 3}, + }, + s2: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + {Offset: 2, Length: 3}, + {Offset: 3, Length: 3}, + }, + exp: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 7}, + {Offset: 3, Length: 3}, + }, + }, + { + // s1 is superset of s2. + s1: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 3, Length: 5}, + {Offset: 3, Length: 3}, + }, + s2: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 5, Length: 3}, + {Offset: 4, Length: 3}, + }, + exp: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 3, Length: 5}, + {Offset: 3, Length: 3}, + }, + }, + { + // No overlaps but one span is side by side. + s1: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 3, Length: 3}, + {Offset: 5, Length: 3}, + }, + s2: []histogram.Span{ + {Offset: 3, Length: 3}, + {Offset: 4, Length: 2}, + }, + exp: []histogram.Span{ + {Offset: 0, Length: 9}, + {Offset: 1, Length: 2}, + {Offset: 2, Length: 3}, + }, + }, + { + // No buckets in one of them. + s1: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 3, Length: 3}, + {Offset: 5, Length: 3}, + }, + s2: []histogram.Span{}, + exp: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 3, Length: 3}, + {Offset: 5, Length: 3}, + }, + }, + { // Zero length spans. + s1: []histogram.Span{ + {Offset: -5, Length: 0}, + {Offset: 2, Length: 0}, + {Offset: 3, Length: 3}, + {Offset: 1, Length: 0}, + {Offset: 2, Length: 3}, + {Offset: 2, Length: 0}, + {Offset: 2, Length: 0}, + {Offset: 1, Length: 3}, + {Offset: 4, Length: 0}, + {Offset: 5, Length: 0}, + }, + s2: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + {Offset: 1, Length: 0}, + {Offset: 1, Length: 3}, + {Offset: 3, Length: 3}, + }, + exp: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 7}, + {Offset: 3, Length: 3}, + }, + }, + } + + for _, c := range cases { + s1c := make([]histogram.Span, len(c.s1)) + s2c := make([]histogram.Span, len(c.s2)) + copy(s1c, c.s1) + copy(s2c, c.s2) + + _, _, act := bidirectionalCompareSpans(c.s1, c.s2) + require.Equal(t, c.exp, act) + // Check that s1 and s2 are not modified. + require.Equal(t, s1c, c.s1) + require.Equal(t, s2c, c.s2) + _, _, act = bidirectionalCompareSpans(c.s2, c.s1) + require.Equal(t, c.exp, act) + } +} diff --git a/tsdb/head.go b/tsdb/head.go index 4eb4339bd..379bb222d 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -17,6 +17,7 @@ import ( "fmt" "io" "math" + "math/rand" "path/filepath" "sync" "time" @@ -2026,7 +2027,7 @@ func (h *Head) updateWALReplayStatusRead(current int) { func GenerateTestHistograms(n int) (r []*histogram.Histogram) { for i := 0; i < n; i++ { r = append(r, &histogram.Histogram{ - Count: 5 + uint64(i*4), + Count: 10 + uint64(i*8), ZeroCount: 2 + uint64(i), ZeroThreshold: 0.001, Sum: 18.4 * float64(i+1), @@ -2036,6 +2037,11 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) { {Offset: 1, Length: 2}, }, PositiveBuckets: []int64{int64(i + 1), 1, -1, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []int64{int64(i + 1), 1, -1, 0}, }) } @@ -2045,7 +2051,7 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) { func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { for i := 0; i < n; i++ { r = append(r, &histogram.FloatHistogram{ - Count: 5 + float64(i*4), + Count: 10 + float64(i*8), ZeroCount: 2 + float64(i), ZeroThreshold: 0.001, Sum: 18.4 * float64(i+1), @@ -2055,6 +2061,37 @@ func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { {Offset: 1, Length: 2}, }, PositiveBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, + }) + } + + return r +} + +func GenerateTestGaugeHistograms(n int) (r []*histogram.FloatHistogram) { + for x := 0; x < n; x++ { + i := rand.Intn(n) + r = append(r, &histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Count: 10 + float64(i*8), + ZeroCount: 2 + float64(i), + ZeroThreshold: 0.001, + Sum: 18.4 * float64(i+1), + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, }) } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index c09027ae4..0785e99e8 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1138,9 +1138,10 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. +// TODO(codesome): Support gauge histograms here. func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper - // chunk reference afterwards. We check for Appendable before + // chunk reference afterwards. We check for Appendable from appender before // appendPreprocessor because in case it ends up creating a new chunk, // we need to know if there was also a counter reset or not to set the // meta properly. @@ -1209,24 +1210,37 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper - // chunk reference afterwards. We check for Appendable before + // chunk reference afterwards. We check for Appendable from appender before // appendPreprocessor because in case it ends up creating a new chunk, // we need to know if there was also a counter reset or not to set the // meta properly. app, _ := s.app.(*chunkenc.FloatHistogramAppender) var ( positiveInterjections, negativeInterjections []chunkenc.Interjection + pBackwardInter, nBackwardInter []chunkenc.Interjection + pMergedSpans, nMergedSpans []histogram.Span okToAppend, counterReset bool ) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) if !sampleInOrder { return sampleInOrder, chunkCreated } + gauge := fh.CounterResetHint == histogram.GaugeType if app != nil { - positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh) + if gauge { + positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter, + pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(fh) + } else { + positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh) + } } if !chunkCreated { + if len(pBackwardInter)+len(nBackwardInter) > 0 { + fh.PositiveSpans = pMergedSpans + fh.NegativeSpans = nMergedSpans + app.RecodeHistogramm(fh, pBackwardInter, nBackwardInter) + } // We have 3 cases here // - !okToAppend -> We need to cut a new chunk. // - okToAppend but we have interjections → Existing chunk needs @@ -1251,7 +1265,9 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, if chunkCreated { hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk) header := chunkenc.UnknownCounterReset - if counterReset { + if gauge { + header = chunkenc.GaugeType + } else if counterReset { header = chunkenc.CounterReset } else if okToAppend { header = chunkenc.NotCounterReset diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 6d7b82e1b..545bf5046 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -110,7 +110,9 @@ func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) { func readTestWAL(t testing.TB, dir string) (recs []interface{}) { sr, err := wlog.NewSegmentsReader(dir) require.NoError(t, err) - defer sr.Close() + defer func() { + require.NoError(t, sr.Close()) + }() var dec record.Decoder r := wlog.NewReader(sr) @@ -127,6 +129,14 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { samples, err := dec.Samples(rec, nil) require.NoError(t, err) recs = append(recs, samples) + case record.HistogramSamples: + samples, err := dec.HistogramSamples(rec, nil) + require.NoError(t, err) + recs = append(recs, samples) + case record.FloatHistogramSamples: + samples, err := dec.FloatHistogramSamples(rec, nil) + require.NoError(t, err) + recs = append(recs, samples) case record.Tombstones: tstones, err := dec.Tombstones(rec, nil) require.NoError(t, err) @@ -2824,6 +2834,7 @@ func TestAppendHistogram(t *testing.T) { ingestTs := int64(0) app := head.Appender(context.Background()) + // Integer histograms. type timedHistogram struct { t int64 h *histogram.Histogram @@ -2844,6 +2855,7 @@ func TestAppendHistogram(t *testing.T) { t int64 h *histogram.FloatHistogram } + // Float counter histograms. expFloatHistograms := make([]timedFloatHistogram, 0, numHistograms) for _, fh := range GenerateTestFloatHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) @@ -2855,6 +2867,18 @@ func TestAppendHistogram(t *testing.T) { app = head.Appender(context.Background()) } } + + // Float gauge histograms. + for _, fh := range GenerateTestGaugeHistograms(numHistograms) { + _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) + require.NoError(t, err) + expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh}) + ingestTs++ + if ingestTs%50 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } require.NoError(t, app.Commit()) q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) @@ -2898,7 +2922,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { // Series with only histograms. s1 := labels.FromStrings("a", "b1") k1 := s1.String() - numHistograms := 450 + numHistograms := 300 exp := map[string][]tsdbutil.Sample{} app := head.Appender(context.Background()) ts := int64(0) @@ -2916,26 +2940,34 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { } } require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - for _, h := range GenerateTestFloatHistograms(numHistograms) { - h.Count = h.Count * 2 - h.NegativeSpans = h.PositiveSpans - h.NegativeBuckets = h.PositiveBuckets - _, err := app.AppendHistogram(0, s1, ts, nil, h) - require.NoError(t, err) - exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()}) - ts++ - if ts%5 == 0 { - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) + for _, gauge := range []bool{true, false} { + app = head.Appender(context.Background()) + var hists []*histogram.FloatHistogram + if gauge { + hists = GenerateTestGaugeHistograms(numHistograms) + } else { + hists = GenerateTestFloatHistograms(numHistograms) } + for _, h := range hists { + h.Count = h.Count * 2 + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + _, err := app.AppendHistogram(0, s1, ts, nil, h) + require.NoError(t, err) + exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()}) + ts++ + if ts%5 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) } - require.NoError(t, app.Commit()) // There should be 7 mmap chunks in s1. ms := head.series.getByHash(s1.Hash(), s1) - require.Len(t, ms.mmappedChunks, 7) - expMmapChunks := make([]*mmappedChunk, 0, 7) + require.Len(t, ms.mmappedChunks, 8) + expMmapChunks := make([]*mmappedChunk, 0, 8) for _, mmap := range ms.mmappedChunks { require.Greater(t, mmap.numSamples, uint16(0)) cpy := *mmap @@ -2972,51 +3004,68 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { } } require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - for _, h := range GenerateTestFloatHistograms(100) { - ts++ - h.Count = h.Count * 2 - h.NegativeSpans = h.PositiveSpans - h.NegativeBuckets = h.PositiveBuckets - _, err := app.AppendHistogram(0, s2, int64(ts), nil, h) - require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()}) - if ts%20 == 0 { - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - // Add some float. - for i := 0; i < 10; i++ { - ts++ - _, err := app.Append(0, s2, int64(ts), float64(ts)) - require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)}) - } - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) + for _, gauge := range []bool{true, false} { + app = head.Appender(context.Background()) + var hists []*histogram.FloatHistogram + if gauge { + hists = GenerateTestGaugeHistograms(100) + } else { + hists = GenerateTestFloatHistograms(100) } + for _, h := range hists { + ts++ + h.Count = h.Count * 2 + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + _, err := app.AppendHistogram(0, s2, int64(ts), nil, h) + require.NoError(t, err) + exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()}) + if ts%20 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + // Add some float. + for i := 0; i < 10; i++ { + ts++ + _, err := app.Append(0, s2, int64(ts), float64(ts)) + require.NoError(t, err) + exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)}) + } + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) } - require.NoError(t, app.Commit()) // Restart head. require.NoError(t, head.Close()) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) - require.NoError(t, err) - head, err = NewHead(nil, nil, w, nil, head.opts, nil) - require.NoError(t, err) - require.NoError(t, head.Init(0)) + startHead := func() { + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, nil, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) + } + startHead() // Checking contents of s1. ms = head.series.getByHash(s1.Hash(), s1) require.Equal(t, expMmapChunks, ms.mmappedChunks) - for _, mmap := range ms.mmappedChunks { - require.Greater(t, mmap.numSamples, uint16(0)) - } require.Equal(t, expHeadChunkSamples, ms.headChunk.chunk.NumSamples()) - q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) - require.NoError(t, err) - act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*")) - require.Equal(t, exp, act) + testQuery := func() { + q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*")) + require.Equal(t, exp, act) + } + testQuery() + + // Restart with no mmap chunks to test WAL replay. + require.NoError(t, head.Close()) + require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) + startHead() + testQuery() } func TestChunkSnapshot(t *testing.T) { @@ -3522,7 +3571,7 @@ func TestHistogramCounterResetHeader(t *testing.T) { if floatHisto { _, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat()) } else { - _, err = app.AppendHistogram(0, l, ts, h, nil) + _, err = app.AppendHistogram(0, l, ts, h.Copy(), nil) } require.NoError(t, err) require.NoError(t, app.Commit()) @@ -3553,10 +3602,6 @@ func TestHistogramCounterResetHeader(t *testing.T) { } h := GenerateTestHistograms(1)[0] - if len(h.NegativeBuckets) == 0 { - h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...) - h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...) - } h.PositiveBuckets = []int64{100, 1, 1, 1} h.NegativeBuckets = []int64{100, 1, 1, 1} h.Count = 1000 @@ -4517,3 +4562,78 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) { require.NoError(t, h.truncateOOO(0, 2)) require.Equal(t, 295*time.Minute.Milliseconds(), h.MinOOOTime()) } + +func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { + l := labels.FromStrings("a", "b") + head, _ := newTestHead(t, 1000, false, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + ts := int64(0) + appendHistogram := func(h *histogram.FloatHistogram) { + ts++ + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, ts, nil, h.Copy()) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + hists := GenerateTestGaugeHistograms(5) + hists[0].CounterResetHint = histogram.UnknownCounterReset + appendHistogram(hists[0]) + appendHistogram(hists[1]) + appendHistogram(hists[2]) + hists[3].CounterResetHint = histogram.UnknownCounterReset + appendHistogram(hists[3]) + appendHistogram(hists[3]) + appendHistogram(hists[4]) + + checkHeaders := func() { + ms, _, err := head.getOrCreate(l.Hash(), l) + require.NoError(t, err) + require.Len(t, ms.mmappedChunks, 3) + expHeaders := []chunkenc.CounterResetHeader{ + chunkenc.UnknownCounterReset, + chunkenc.GaugeType, + chunkenc.UnknownCounterReset, + chunkenc.GaugeType, + } + for i, mmapChunk := range ms.mmappedChunks { + chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref) + require.NoError(t, err) + require.Equal(t, expHeaders[i], chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) + } + require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) + } + checkHeaders() + + recs := readTestWAL(t, head.wal.Dir()) + require.Equal(t, []interface{}{ + []record.RefSeries{ + { + Ref: 1, + Labels: labels.FromStrings("a", "b"), + }, + }, + []record.RefFloatHistogramSample{{Ref: 1, T: 1, FH: hists[0]}}, + []record.RefFloatHistogramSample{{Ref: 1, T: 2, FH: hists[1]}}, + []record.RefFloatHistogramSample{{Ref: 1, T: 3, FH: hists[2]}}, + []record.RefFloatHistogramSample{{Ref: 1, T: 4, FH: hists[3]}}, + []record.RefFloatHistogramSample{{Ref: 1, T: 5, FH: hists[3]}}, + []record.RefFloatHistogramSample{{Ref: 1, T: 6, FH: hists[4]}}, + }, recs) + + // Restart Head without mmap chunks to expect the WAL replay to recognize gauge histograms. + require.NoError(t, head.Close()) + require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) + + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, nil, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) + + checkHeaders() +} diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 98894bb42..231b8b3c1 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -441,6 +441,8 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) H: &histogram.Histogram{}, } + rh.H.CounterResetHint = histogram.CounterResetHint(dec.Byte()) + rh.H.Schema = int32(dec.Varint64()) rh.H.ZeroThreshold = math.Float64frombits(dec.Be64()) @@ -517,6 +519,8 @@ func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogr FH: &histogram.FloatHistogram{}, } + rh.FH.CounterResetHint = histogram.CounterResetHint(dec.Byte()) + rh.FH.Schema = int32(dec.Varint64()) rh.FH.ZeroThreshold = dec.Be64Float64() @@ -715,6 +719,8 @@ func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) [] buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) buf.PutVarint64(h.T - first.T) + buf.PutByte(byte(h.H.CounterResetHint)) + buf.PutVarint64(int64(h.H.Schema)) buf.PutBE64(math.Float64bits(h.H.ZeroThreshold)) @@ -766,6 +772,8 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) buf.PutVarint64(h.T - first.T) + buf.PutByte(byte(h.FH.CounterResetHint)) + buf.PutVarint64(int64(h.FH.Schema)) buf.PutBEFloat64(h.FH.ZeroThreshold) diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index 4ad7685a0..518942314 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -165,6 +165,22 @@ func TestRecord_EncodeDecode(t *testing.T) { decFloatHistograms, err := dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil) require.NoError(t, err) require.Equal(t, floatHistograms, decFloatHistograms) + + // Gauge ingeger histograms. + for i := range histograms { + histograms[i].H.CounterResetHint = histogram.GaugeType + } + decHistograms, err = dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil) + require.NoError(t, err) + require.Equal(t, histograms, decHistograms) + + // Gauge float histograms. + for i := range floatHistograms { + floatHistograms[i].FH.CounterResetHint = histogram.GaugeType + } + decFloatHistograms, err = dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil) + require.NoError(t, err) + require.Equal(t, floatHistograms, decFloatHistograms) } // TestRecord_Corrupted ensures that corrupted records return the correct error.