From 9fbcf14e5ce49f5e8d0cee456c7859f4987bdb73 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 6 Jan 2022 17:44:30 +0100 Subject: [PATCH] histogram: Handle changes of the ZeroThreshold and the Schema Signed-off-by: beorn7 --- model/histogram/float_histogram.go | 501 ++++++++++++++++------ model/histogram/float_histogram_test.go | 545 +++++++++++++++++++++++- promql/engine.go | 17 +- promql/functions.go | 41 +- 4 files changed, 954 insertions(+), 150 deletions(-) diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index beddf8a3ec..2d774c2f68 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -73,6 +73,49 @@ func (h *FloatHistogram) Copy() *FloatHistogram { return &c } +// CopyToSchema works like Copy, but the returned deep copy has the provided +// target schema, which must be ≤ the original schema (i.e. it must have a lower +// resolution). +func (h *FloatHistogram) CopyToSchema(targetSchema int32) *FloatHistogram { + if targetSchema == h.Schema { + // Fast path. + return h.Copy() + } + if targetSchema > h.Schema { + panic(fmt.Errorf("cannot copy from schema %d to %d", h.Schema, targetSchema)) + } + c := FloatHistogram{ + Schema: targetSchema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: h.ZeroCount, + Count: h.Count, + Sum: h.Sum, + } + + // TODO(beorn7): This is a straight-forward implementation using merging + // iterators for the original buckets and then adding one merged bucket + // after another to the newly created FloatHistogram. It's well possible + // that a more involved implementation performs much better, which we + // could do if this code path turns out to be performance-critical. + var iInSpan, index int32 + for iSpan, iBucket, it := -1, -1, h.floatBucketIterator(true, 0, targetSchema); it.Next(); { + b := it.At() + c.PositiveSpans, c.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket( + b, c.PositiveSpans, c.PositiveBuckets, iSpan, iBucket, iInSpan, index, + ) + index = b.Index + } + for iSpan, iBucket, it := -1, -1, h.floatBucketIterator(false, 0, targetSchema); it.Next(); { + b := it.At() + c.NegativeSpans, c.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket( + b, c.NegativeSpans, c.NegativeBuckets, iSpan, iBucket, iInSpan, index, + ) + index = b.Index + } + + return &c +} + // String returns a string representation of the Histogram. func (h *FloatHistogram) String() string { var sb strings.Builder @@ -140,29 +183,30 @@ func (h *FloatHistogram) Scale(factor float64) *FloatHistogram { // resulting histogram might have buckets with a population of zero or directly // adjacent spans (offset=0). To normalize those, call the Compact method. // -// This method returns a pointer to the receiving histogram for convenience. +// The method reconciles differences in the zero threshold and in the schema, +// but the schema of the other histogram must be ≥ the schema of the receiving +// histogram (i.e. must have an equal or higher resolution). This means that the +// schema of the receiving histogram won't change. Its zero threshold, however, +// will change if needed. The other histogram will not be modified in any case. // -// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the -// same in both histograms. Otherwise, its behavior is undefined. -// TODO(beorn7): Change that! +// This method returns a pointer to the receiving histogram for convenience. func (h *FloatHistogram) Add(other *FloatHistogram) *FloatHistogram { - h.ZeroCount += other.ZeroCount + otherZeroCount := h.reconcileZeroBuckets(other) + h.ZeroCount += otherZeroCount h.Count += other.Count h.Sum += other.Sum // TODO(beorn7): If needed, this can be optimized by inspecting the // spans in other and create missing buckets in h in batches. - iSpan, iBucket := -1, -1 var iInSpan, index int32 - for it := other.PositiveBucketIterator(); it.Next(); { + for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(true, h.ZeroThreshold, h.Schema); it.Next(); { b := it.At() h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket( b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index, ) index = b.Index } - iSpan, iBucket = -1, -1 - for it := other.NegativeBucketIterator(); it.Next(); { + for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(false, h.ZeroThreshold, h.Schema); it.Next(); { b := it.At() h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket( b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index, @@ -173,20 +217,16 @@ func (h *FloatHistogram) Add(other *FloatHistogram) *FloatHistogram { } // Sub works like Add but subtracts the other histogram. -// -// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the -// same in both histograms. Otherwise, its behavior is undefined. -// TODO(beorn7): Change that! func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram { - h.ZeroCount -= other.ZeroCount + otherZeroCount := h.reconcileZeroBuckets(other) + h.ZeroCount -= otherZeroCount h.Count -= other.Count h.Sum -= other.Sum // TODO(beorn7): If needed, this can be optimized by inspecting the // spans in other and create missing buckets in h in batches. - iSpan, iBucket := -1, -1 var iInSpan, index int32 - for it := other.PositiveBucketIterator(); it.Next(); { + for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(true, h.ZeroThreshold, h.Schema); it.Next(); { b := it.At() b.Count *= -1 h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket( @@ -194,8 +234,7 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram { ) index = b.Index } - iSpan, iBucket = -1, -1 - for it := other.NegativeBucketIterator(); it.Next(); { + for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(false, h.ZeroThreshold, h.Schema); it.Next(); { b := it.At() b.Count *= -1 h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket( @@ -226,7 +265,7 @@ func addBucket( buckets = append(buckets, 0) copy(buckets[1:], buckets) buckets[0] = b.Count - if spans[0].Offset == b.Index+1 { + if len(spans) > 0 && spans[0].Offset == b.Index+1 { spans[0].Length++ spans[0].Offset-- return spans, buckets, 0, 0, 0 @@ -469,9 +508,24 @@ func compactBuckets(buckets []float64, spans []Span, maxEmptyBuckets int) ([]flo // of observations, but NOT the sum of observations) is smaller in the receiving // histogram compared to the previous histogram. Otherwise, it returns false. // -// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the -// same in both histograms. Otherwise, its behavior is undefined. -// TODO(beorn7): Change that! +// Special behavior in case the Schema or the ZeroThreshold are not the same in +// both histograms: +// +// * A decrease of the ZeroThreshold or an increase of the Schema (i.e. an +// increase of resolution) can only happen together with a reset. Thus, the +// method returns true in either case. +// +// * Upon an increase of the ZeroThreshold, the buckets in the previous +// histogram that fall within the new ZeroThreshold are added to the ZeroCount +// of the previous histogram (without mutating the provided previous +// histogram). The scenario that a populated bucket of the previous histogram +// is partially within, partially outside of the new ZeroThreshold, can only +// happen together with a counter reset and therefore shortcuts to returning +// true. +// +// * Upon a decrease of the Schema, the buckets of the previous histogram are +// merged so that they match the new, lower-resolution schema (again without +// mutating the provided previous histogram). // // Note that this kind of reset detection is quite expensive. Ideally, resets // are detected at ingest time and stored in the TSDB, so that the reset @@ -481,16 +535,29 @@ func (h *FloatHistogram) DetectReset(previous *FloatHistogram) bool { if h.Count < previous.Count { return true } - if h.ZeroCount < previous.ZeroCount { + if h.Schema > previous.Schema { return true } - currIt := h.PositiveBucketIterator() - prevIt := previous.PositiveBucketIterator() + if h.ZeroThreshold < previous.ZeroThreshold { + // ZeroThreshold decreased. + return true + } + previousZeroCount, newThreshold := previous.zeroCountForLargerThreshold(h.ZeroThreshold) + if newThreshold != h.ZeroThreshold { + // ZeroThreshold is within a populated bucket in previous + // histogram. + return true + } + if h.ZeroCount < previousZeroCount { + return true + } + currIt := h.floatBucketIterator(true, h.ZeroThreshold, h.Schema) + prevIt := previous.floatBucketIterator(true, h.ZeroThreshold, h.Schema) if detectReset(currIt, prevIt) { return true } - currIt = h.NegativeBucketIterator() - prevIt = previous.NegativeBucketIterator() + currIt = h.floatBucketIterator(false, h.ZeroThreshold, h.Schema) + prevIt = previous.floatBucketIterator(false, h.ZeroThreshold, h.Schema) return detectReset(currIt, prevIt) } @@ -558,35 +625,40 @@ func detectReset(currIt, prevIt FloatBucketIterator) bool { // positive buckets in ascending order (starting next to the zero bucket and // going up). func (h *FloatHistogram) PositiveBucketIterator() FloatBucketIterator { - return newFloatBucketIterator(h, true) + return h.floatBucketIterator(true, 0, h.Schema) } // NegativeBucketIterator returns a FloatBucketIterator to iterate over all // negative buckets in descending order (starting next to the zero bucket and // going down). func (h *FloatHistogram) NegativeBucketIterator() FloatBucketIterator { - return newFloatBucketIterator(h, false) + return h.floatBucketIterator(false, 0, h.Schema) } // PositiveReverseBucketIterator returns a FloatBucketIterator to iterate over all // positive buckets in descending order (starting at the highest bucket and going // down towards the zero bucket). func (h *FloatHistogram) PositiveReverseBucketIterator() FloatBucketIterator { - return newReverseFloatBucketIterator(h, true) + return h.reverseFloatBucketIterator(true) } // NegativeReverseBucketIterator returns a FloatBucketIterator to iterate over all // negative buckets in ascending order (starting at the lowest bucket and going up // towards the zero bucket). func (h *FloatHistogram) NegativeReverseBucketIterator() FloatBucketIterator { - return newReverseFloatBucketIterator(h, false) + return h.reverseFloatBucketIterator(false) } // AllBucketIterator returns a FloatBucketIterator to iterate over all negative, // zero, and positive buckets in ascending order (starting at the lowest bucket // and going up). func (h *FloatHistogram) AllBucketIterator() FloatBucketIterator { - return newAllFloatBucketIterator(h) + return &allFloatBucketIterator{ + h: h, + negIter: h.NegativeReverseBucketIterator(), + posIter: h.PositiveBucketIterator(), + state: -1, + } } // CumulativeBucketIterator returns a FloatBucketIterator to iterate over a @@ -600,6 +672,117 @@ func (h *FloatHistogram) CumulativeBucketIterator() FloatBucketIterator { return &cumulativeFloatBucketIterator{h: h, posSpansIdx: -1} } +// zeroCountForLargerThreshold returns what the histogram's zero count would be +// if the ZeroThreshold had the provided larger (or equal) value. If the +// provided value is less than the histogram's ZeroThreshold, the method panics. +// If the largerThreshold ends up within a populated bucket of the histogram, it +// is adjusted upwards to the lower limit of that bucket (all in terms of +// absolute values) and that bucket's count is included in the returned +// count. The adjusted threshold is returned, too. +func (h *FloatHistogram) zeroCountForLargerThreshold(largerThreshold float64) (count, threshold float64) { + // Fast path. + if largerThreshold == h.ZeroThreshold { + return h.ZeroCount, largerThreshold + } + if largerThreshold < h.ZeroThreshold { + panic(fmt.Errorf("new threshold %f is less than old threshold %f", largerThreshold, h.ZeroThreshold)) + } +outer: + for { + count = h.ZeroCount + i := h.PositiveBucketIterator() + for i.Next() { + b := i.At() + if b.Lower >= largerThreshold { + break + } + count += b.Count // Bucket to be merged into zero bucket. + if b.Upper > largerThreshold { + // New threshold ended up within a bucket. if it's + // populated, we need to adjust largerThreshold before + // we are done here. + if b.Count != 0 { + largerThreshold = b.Upper + } + break + } + } + i = h.NegativeBucketIterator() + for i.Next() { + b := i.At() + if b.Upper <= -largerThreshold { + break + } + count += b.Count // Bucket to be merged into zero bucket. + if b.Lower < -largerThreshold { + // New threshold ended up within a bucket. If + // it's populated, we need to adjust + // largerThreshold and have to redo the whole + // thing because the treatment of the positive + // buckets is invalid now. + if b.Count != 0 { + largerThreshold = -b.Lower + continue outer + } + break + } + } + return count, largerThreshold + } +} + +// trimBucketsInZeroBucket removes all buckets that are within the zero +// bucket. It assumes that the zero threshold is at a bucket boundary and that +// the counts in the buckets to remove are already part of the zero count. +func (h *FloatHistogram) trimBucketsInZeroBucket() { + i := h.PositiveBucketIterator() + bucketsIdx := 0 + for i.Next() { + b := i.At() + if b.Lower >= h.ZeroThreshold { + break + } + h.PositiveBuckets[bucketsIdx] = 0 + bucketsIdx++ + } + i = h.NegativeBucketIterator() + bucketsIdx = 0 + for i.Next() { + b := i.At() + if b.Upper <= -h.ZeroThreshold { + break + } + h.NegativeBuckets[bucketsIdx] = 0 + bucketsIdx++ + } + // We are abusing Compact to trim the buckets set to zero + // above. Premature compacting could cause additional cost, but this + // code path is probably rarely used anyway. + h.Compact(3) +} + +// reconcileZeroBuckets finds a zero bucket large enough to include the zero +// buckets of both histograms (the receiving histogram and the other histogram) +// with a zero threshold that is not within a populated bucket in either +// histogram. This method modifies the receiving histogram accourdingly, but +// leaves the other histogram as is. Instead, it returns the zero count the +// other histogram would have if it were modified. +func (h *FloatHistogram) reconcileZeroBuckets(other *FloatHistogram) float64 { + otherZeroCount := other.ZeroCount + otherZeroThreshold := other.ZeroThreshold + + for otherZeroThreshold != h.ZeroThreshold { + if h.ZeroThreshold > otherZeroThreshold { + otherZeroCount, otherZeroThreshold = other.zeroCountForLargerThreshold(h.ZeroThreshold) + } + if otherZeroThreshold > h.ZeroThreshold { + h.ZeroCount, h.ZeroThreshold = h.zeroCountForLargerThreshold(otherZeroThreshold) + h.trimBucketsInZeroBucket() + } + } + return otherZeroCount +} + // FloatBucketIterator iterates over the buckets of a FloatHistogram, returning // decoded buckets. type FloatBucketIterator interface { @@ -646,100 +829,43 @@ func (b FloatBucket) String() string { return sb.String() } -type floatBucketIterator struct { - schema int32 - spans []Span - buckets []float64 - - positive bool // Whether this is for positive buckets. - - spansIdx int // Current span within spans slice. - idxInSpan uint32 // Index in the current span. 0 <= idxInSpan < span.Length. - bucketsIdx int // Current bucket within buckets slice. - - currCount float64 // Count in the current bucket. - currIdx int32 // The actual bucket index. - currLower, currUpper float64 // Limits of the current bucket. - -} - -func newFloatBucketIterator(h *FloatHistogram, positive bool) *floatBucketIterator { - r := &floatBucketIterator{schema: h.Schema, positive: positive} +// floatBucketIterator is a low-level constructor for bucket iterators. +// +// If positive is true, the returned iterator iterates through the positive +// buckets, otherwise through the negative buckets. +// +// If absoluteStartValue is < the lowest absolute value of any upper bucket +// boundary, the iterator starts with the first bucket. Otherwise, it will skip +// all buckets with an absolute value of their upper boundary ≤ +// absoluteStartValue. +// +// targetSchema must be ≤ the schema of FloatHistogram (and of course within the +// legal values for schemas in general). The buckets are merged to match the +// targetSchema prior to iterating (without mutating FloatHistogram). +func (h *FloatHistogram) floatBucketIterator( + positive bool, absoluteStartValue float64, targetSchema int32, +) *floatBucketIterator { + if targetSchema > h.Schema { + panic(fmt.Errorf("cannot merge from schema %d to %d", h.Schema, targetSchema)) + } + i := &floatBucketIterator{ + schema: h.Schema, + targetSchema: targetSchema, + positive: positive, + absoluteStartValue: absoluteStartValue, + } if positive { - r.spans = h.PositiveSpans - r.buckets = h.PositiveBuckets + i.spans = h.PositiveSpans + i.buckets = h.PositiveBuckets } else { - r.spans = h.NegativeSpans - r.buckets = h.NegativeBuckets + i.spans = h.NegativeSpans + i.buckets = h.NegativeBuckets } - return r + return i } -func (r *floatBucketIterator) Next() bool { - if r.spansIdx >= len(r.spans) { - return false - } - span := r.spans[r.spansIdx] - // Seed currIdx for the first bucket. - if r.bucketsIdx == 0 { - r.currIdx = span.Offset - } else { - r.currIdx++ - } - for r.idxInSpan >= span.Length { - // We have exhausted the current span and have to find a new - // one. We'll even handle pathologic spans of length 0. - r.idxInSpan = 0 - r.spansIdx++ - if r.spansIdx >= len(r.spans) { - return false - } - span = r.spans[r.spansIdx] - r.currIdx += span.Offset - } - - r.currCount = r.buckets[r.bucketsIdx] - if r.positive { - r.currUpper = getBound(r.currIdx, r.schema) - r.currLower = getBound(r.currIdx-1, r.schema) - } else { - r.currLower = -getBound(r.currIdx, r.schema) - r.currUpper = -getBound(r.currIdx-1, r.schema) - } - - r.idxInSpan++ - r.bucketsIdx++ - return true -} - -func (r *floatBucketIterator) At() FloatBucket { - return FloatBucket{ - Count: r.currCount, - Lower: r.currLower, - Upper: r.currUpper, - LowerInclusive: r.currLower < 0, - UpperInclusive: r.currUpper > 0, - Index: r.currIdx, - } -} - -type reverseFloatBucketIterator struct { - schema int32 - spans []Span - buckets []float64 - - positive bool // Whether this is for positive buckets. - - spansIdx int // Current span within spans slice. - idxInSpan int32 // Index in the current span. 0 <= idxInSpan < span.Length. - bucketsIdx int // Current bucket within buckets slice. - - currCount float64 // Count in the current bucket. - currIdx int32 // The actual bucket index. - currLower, currUpper float64 // Limits of the current bucket. -} - -func newReverseFloatBucketIterator(h *FloatHistogram, positive bool) *reverseFloatBucketIterator { +// reverseFloatbucketiterator is a low-level constructor for reverse bucket iterators. +func (h *FloatHistogram) reverseFloatBucketIterator(positive bool) *reverseFloatBucketIterator { r := &reverseFloatBucketIterator{schema: h.Schema, positive: positive} if positive { r.spans = h.PositiveSpans @@ -762,6 +888,130 @@ func newReverseFloatBucketIterator(h *FloatHistogram, positive bool) *reverseFlo return r } +type floatBucketIterator struct { + // targetSchema is the schema to merge to and must be ≤ schema. + schema, targetSchema int32 + spans []Span + buckets []float64 + + positive bool // Whether this is for positive buckets. + + spansIdx int // Current span within spans slice. + idxInSpan uint32 // Index in the current span. 0 <= idxInSpan < span.Length. + bucketsIdx int // Current bucket within buckets slice. + + currCount float64 // Count in the current bucket. + currIdx int32 // The bucket index within the targetSchema. + origIdx int32 // The bucket index within the original schema. + + absoluteStartValue float64 // Never return buckets with an upper bound ≤ this value. +} + +func (i *floatBucketIterator) Next() bool { + if i.spansIdx >= len(i.spans) { + return false + } + + // Copy all of these into local variables so that we can forward to the + // next bucket and then roll back if needed. + origIdx, spansIdx, idxInSpan := i.origIdx, i.spansIdx, i.idxInSpan + span := i.spans[spansIdx] + firstPass := true + i.currCount = 0 + +mergeLoop: // Merge together all buckets from the original schema that fall into one bucket in the targetSchema. + for { + if i.bucketsIdx == 0 { + // Seed origIdx for the first bucket. + origIdx = span.Offset + } else { + origIdx++ + } + for idxInSpan >= span.Length { + // We have exhausted the current span and have to find a new + // one. We even handle pathologic spans of length 0 here. + idxInSpan = 0 + spansIdx++ + if spansIdx >= len(i.spans) { + if firstPass { + return false + } + break mergeLoop + } + span = i.spans[spansIdx] + origIdx += span.Offset + } + currIdx := i.targetIdx(origIdx) + if firstPass { + i.currIdx = currIdx + firstPass = false + } else if currIdx != i.currIdx { + // Reached next bucket in targetSchema. + // Do not actually forward to the next bucket, but break out. + break mergeLoop + } + i.currCount += i.buckets[i.bucketsIdx] + idxInSpan++ + i.bucketsIdx++ + i.origIdx, i.spansIdx, i.idxInSpan = origIdx, spansIdx, idxInSpan + if i.schema == i.targetSchema { + // Don't need to test the next bucket for mergeability + // if we have no schema change anyway. + break mergeLoop + } + } + // Skip buckets before absoluteStartValue. + // TODO(beorn7): Maybe do something more efficient than this recursive call. + if getBound(i.currIdx, i.targetSchema) <= i.absoluteStartValue { + return i.Next() + } + return true +} + +func (i *floatBucketIterator) At() FloatBucket { + b := FloatBucket{ + Count: i.currCount, + Index: i.currIdx, + } + if i.positive { + b.Upper = getBound(i.currIdx, i.targetSchema) + b.Lower = getBound(i.currIdx-1, i.targetSchema) + } else { + b.Lower = -getBound(i.currIdx, i.targetSchema) + b.Upper = -getBound(i.currIdx-1, i.targetSchema) + } + b.LowerInclusive = b.Lower < 0 + b.UpperInclusive = b.Upper > 0 + return b +} + +// targetIdx returns the bucket index within i.targetSchema for the given bucket +// index within i.schema. +func (i *floatBucketIterator) targetIdx(idx int32) int32 { + if i.schema == i.targetSchema { + // Fast path for the common case. The below would yield the same + // result, just with more effort. + return idx + } + return ((idx - 1) >> (i.schema - i.targetSchema)) + 1 +} + +type reverseFloatBucketIterator struct { + schema int32 + spans []Span + buckets []float64 + + positive bool // Whether this is for positive buckets. + + spansIdx int // Current span within spans slice. + idxInSpan int32 // Index in the current span. 0 <= idxInSpan < span.Length. + bucketsIdx int // Current bucket within buckets slice. + + currCount float64 // Count in the current bucket. + currIdx int32 // The actual bucket index. + currLower, currUpper float64 // Limits of the current bucket. +} + func (r *reverseFloatBucketIterator) Next() bool { r.currIdx-- if r.bucketsIdx < 0 { @@ -812,15 +1062,6 @@ type allFloatBucketIterator struct { currBucket FloatBucket } -func newAllFloatBucketIterator(h *FloatHistogram) *allFloatBucketIterator { - return &allFloatBucketIterator{ - h: h, - negIter: h.NegativeReverseBucketIterator(), - posIter: h.PositiveBucketIterator(), - state: -1, - } -} - func (r *allFloatBucketIterator) Next() bool { switch r.state { case -1: diff --git a/model/histogram/float_histogram_test.go b/model/histogram/float_histogram_test.go index 39f428f1f3..8395ce27bd 100644 --- a/model/histogram/float_histogram_test.go +++ b/model/histogram/float_histogram_test.go @@ -479,6 +479,206 @@ func TestFloatHistogramDetectReset(t *testing.T) { }, true, }, + { + "zero threshold decreases", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.009, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + true, + }, + { + "zero threshold increases without touching any existing buckets", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.011, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "zero threshold increases enough to cover existing buckets", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 7.73, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{1, 3}}, + PositiveBuckets: []float64{3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "zero threshold increases into the middle of an existing buckets", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.3, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + true, + }, + { + "schema increases without any other changes", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + Schema: 0, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + Schema: 1, + PositiveSpans: []Span{{-5, 4}, {2, 6}}, + PositiveBuckets: []float64{0.4, 0.6, 1, 0.23, 2, 1.3, 1.2, 3, 0.05, 0.05}, + NegativeSpans: []Span{{5, 4}, {6, 4}}, + NegativeBuckets: []float64{2, 1.1, 2, 1, 0.234e5, 1e5, 500, 500}, + }, + true, + }, + { + "schema decreases without any other changes", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + Schema: 1, + PositiveSpans: []Span{{-5, 4}, {2, 6}}, + PositiveBuckets: []float64{0.4, 0.6, 1, 0.23, 2, 1.3, 1.2, 3, 0.05, 0.05}, + NegativeSpans: []Span{{5, 4}, {6, 4}}, + NegativeBuckets: []float64{2, 1.1, 2, 1, 0.234e5, 1e5, 500, 500}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + Schema: 0, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "schema decreases and a bucket goes up", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + Schema: 1, + PositiveSpans: []Span{{-5, 4}, {2, 6}}, + PositiveBuckets: []float64{0.4, 0.6, 1, 0.23, 2, 1.3, 1.2, 3, 0.05, 0.05}, + NegativeSpans: []Span{{5, 4}, {6, 4}}, + NegativeBuckets: []float64{2, 1.1, 2, 1, 0.234e5, 1e5, 500, 500}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + Schema: 0, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 4.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "schema decreases and a bucket goes down", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + Schema: 1, + PositiveSpans: []Span{{-5, 4}, {2, 6}}, + PositiveBuckets: []float64{0.4, 0.6, 1, 0.23, 2, 1.3, 1.2, 3, 0.05, 0.05}, + NegativeSpans: []Span{{5, 4}, {6, 4}}, + NegativeBuckets: []float64{2, 1.1, 2, 1, 0.234e5, 1e5, 500, 500}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + Schema: 0, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 2.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + true, + }, } for _, c := range cases { @@ -774,7 +974,7 @@ func TestFloatHistogramAdd(t *testing.T) { { "non-overlapping spans", &FloatHistogram{ - ZeroThreshold: 0.01, + ZeroThreshold: 0.001, ZeroCount: 11, Count: 30, Sum: 2.345, @@ -784,7 +984,7 @@ func TestFloatHistogramAdd(t *testing.T) { NegativeBuckets: []float64{3, 1, 5, 6}, }, &FloatHistogram{ - ZeroThreshold: 0.01, + ZeroThreshold: 0.001, ZeroCount: 8, Count: 21, Sum: 1.234, @@ -794,7 +994,7 @@ func TestFloatHistogramAdd(t *testing.T) { NegativeBuckets: []float64{1, 1, 4, 4}, }, &FloatHistogram{ - ZeroThreshold: 0.01, + ZeroThreshold: 0.001, ZeroCount: 19, Count: 51, Sum: 3.579, @@ -903,6 +1103,243 @@ func TestFloatHistogramAdd(t *testing.T) { NegativeBuckets: []float64{3, 2, 1, 4, 9, 6}, }, }, + { + "schema change", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 8, + Count: 21, + Sum: 1.234, + Schema: 0, + PositiveSpans: []Span{{-1, 4}, {0, 3}}, + PositiveBuckets: []float64{5, 4, 2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + Schema: 1, + PositiveSpans: []Span{{-4, 3}, {5, 5}}, + PositiveBuckets: []float64{1, 0, 0, 3, 2, 2, 3, 4}, + NegativeSpans: []Span{{6, 3}, {6, 4}}, + NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 19, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{-2, 5}, {0, 3}}, + PositiveBuckets: []float64{1, 5, 4, 2, 6, 10, 9, 5}, + NegativeSpans: []Span{{3, 3}, {1, 3}}, + NegativeBuckets: []float64{3, 2, 1, 4, 9, 6}, + }, + }, + { + "larger zero bucket in first histogram", + &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 17, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{1, 2}, {0, 3}}, + PositiveBuckets: []float64{2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 29, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{1, 2}, {0, 3}}, + PositiveBuckets: []float64{2, 6, 10, 9, 5}, + NegativeSpans: []Span{{3, 3}, {1, 3}}, + NegativeBuckets: []float64{3, 2, 1, 4, 9, 6}, + }, + }, + { + "larger zero bucket in second histogram", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 17, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{1, 2}, {0, 3}}, + PositiveBuckets: []float64{2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 1, + ZeroCount: 29, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{1, 5}}, + PositiveBuckets: []float64{2, 6, 10, 9, 5}, + NegativeSpans: []Span{{3, 7}}, + NegativeBuckets: []float64{3, 2, 1, 0, 4, 9, 6}, + }, + }, + { + "larger zero threshold in first histogram ends up inside a populated bucket of second histogram", + &FloatHistogram{ + ZeroThreshold: 0.2, + ZeroCount: 17, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{1, 2}, {0, 3}}, + PositiveBuckets: []float64{2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 0.25, + ZeroCount: 29, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{-1, 1}, {1, 5}}, + PositiveBuckets: []float64{0, 2, 6, 10, 9, 5}, + NegativeSpans: []Span{{3, 7}}, + NegativeBuckets: []float64{3, 2, 1, 0, 4, 9, 6}, + }, + }, + { + "larger zero threshold in second histogram ends up inside a populated bucket of first histogram", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 0.2, + ZeroCount: 17, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{1, 2}, {0, 3}}, + PositiveBuckets: []float64{2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.25, + ZeroCount: 29, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{1, 5}}, + PositiveBuckets: []float64{2, 6, 10, 9, 5}, + NegativeSpans: []Span{{3, 7}}, + NegativeBuckets: []float64{3, 2, 1, 0, 4, 9, 6}, + }, + }, + { + "schema change combined with larger zero bucket in second histogram", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 8, + Count: 21, + Sum: 1.234, + Schema: 0, + PositiveSpans: []Span{{-2, 5}, {0, 3}}, + PositiveBuckets: []float64{2, 5, 4, 2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.25, + ZeroCount: 12, + Count: 30, + Sum: 2.345, + Schema: 1, + PositiveSpans: []Span{{-3, 2}, {5, 5}}, + PositiveBuckets: []float64{1, 0, 3, 2, 2, 3, 4}, + NegativeSpans: []Span{{6, 3}, {6, 4}}, + NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.25, + ZeroCount: 22, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{-1, 7}}, + PositiveBuckets: []float64{6, 4, 2, 6, 10, 9, 5}, + NegativeSpans: []Span{{3, 7}}, + NegativeBuckets: []float64{3, 2, 1, 0, 4, 9, 6}, + }, + }, + { + "schema change combined with larger zero bucket in first histogram", + &FloatHistogram{ + ZeroThreshold: 0.25, + ZeroCount: 8, + Count: 21, + Sum: 1.234, + Schema: 0, + PositiveSpans: []Span{{-1, 4}, {0, 3}}, + PositiveBuckets: []float64{5, 4, 2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + Schema: 1, + PositiveSpans: []Span{{-4, 3}, {5, 5}}, + PositiveBuckets: []float64{1, 0, 0, 3, 2, 2, 3, 4}, + NegativeSpans: []Span{{6, 3}, {6, 4}}, + NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.25, + ZeroCount: 20, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{-1, 4}, {0, 3}}, + PositiveBuckets: []float64{5, 4, 2, 6, 10, 9, 5}, + NegativeSpans: []Span{{3, 3}, {1, 3}}, + NegativeBuckets: []float64{3, 2, 1, 4, 9, 6}, + }, + }, } for _, c := range cases { @@ -954,6 +1391,41 @@ func TestFloatHistogramSub(t *testing.T) { NegativeBuckets: []float64{2, 0, 1, 2}, }, }, + { + "schema change", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 8, + Count: 59, + Sum: 1.234, + Schema: 0, + PositiveSpans: []Span{{-2, 5}, {0, 3}}, + PositiveBuckets: []float64{2, 5, 4, 2, 3, 6, 7, 5}, + NegativeSpans: []Span{{3, 3}, {1, 3}}, + NegativeBuckets: []float64{4, 10, 1, 4, 14, 7}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 2, + Count: 19, + Sum: 0.345, + Schema: 1, + PositiveSpans: []Span{{-4, 3}, {5, 5}}, + PositiveBuckets: []float64{1, 0, 0, 1, 2, 2, 3, 4}, + NegativeSpans: []Span{{6, 3}, {6, 4}}, + NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 6, + Count: 40, + Sum: 0.889, + PositiveSpans: []Span{{-2, 5}, {0, 3}}, + PositiveBuckets: []float64{1, 5, 4, 2, 2, 2, 0, 5}, + NegativeSpans: []Span{{3, 3}, {1, 3}}, + NegativeBuckets: []float64{1, 9, 1, 4, 9, 1}, + }, + }, } for _, c := range cases { @@ -965,6 +1437,73 @@ func TestFloatHistogramSub(t *testing.T) { } } +func TestFloatHistogramCopyToSchema(t *testing.T) { + cases := []struct { + name string + targetSchema int32 + in, expected *FloatHistogram + }{ + { + "no schema change", + 1, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + Schema: 1, + PositiveSpans: []Span{{-4, 3}, {5, 5}}, + PositiveBuckets: []float64{1, 0, 0, 3, 2, 2, 3, 4}, + NegativeSpans: []Span{{6, 3}, {6, 4}}, + NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + Schema: 1, + PositiveSpans: []Span{{-4, 3}, {5, 5}}, + PositiveBuckets: []float64{1, 0, 0, 3, 2, 2, 3, 4}, + NegativeSpans: []Span{{6, 3}, {6, 4}}, + NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4}, + }, + }, + { + "schema change", + 0, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + Schema: 1, + PositiveSpans: []Span{{-4, 3}, {5, 5}}, + PositiveBuckets: []float64{1, 0, 0, 3, 2, 2, 3, 4}, + NegativeSpans: []Span{{6, 3}, {6, 4}}, + NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + Schema: 0, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.expected, c.in.CopyToSchema(c.targetSchema)) + }) + } +} + func TestReverseFloatBucketIterator(t *testing.T) { h := &FloatHistogram{ Count: 405, diff --git a/promql/engine.go b/promql/engine.go index 557416dbfb..3c08980e92 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2157,7 +2157,12 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram switch op { case parser.ADD: if hlhs != nil && hrhs != nil { - return 0, hlhs.Copy().Add(hrhs), true + // The histogram being added must have the larger schema + // code (i.e. the higher resolution). + if hrhs.Schema >= hlhs.Schema { + return 0, hlhs.Copy().Add(hrhs), true + } + return 0, hrhs.Copy().Add(hlhs), true } return lhs + rhs, nil, true case parser.SUB: @@ -2322,7 +2327,15 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without if s.H != nil { group.hasHistogram = true if group.histogramValue != nil { - group.histogramValue.Add(s.H) + // The histogram being added must have + // an equal or larger schema. + if s.H.Schema >= group.histogramValue.Schema { + group.histogramValue.Add(s.H) + } else { + h := s.H.Copy() + h.Add(group.histogramValue) + group.histogramValue = h + } } // Otherwise the aggregation contained floats // previously and will be invalid anyway. No diff --git a/promql/functions.go b/promql/functions.go index 8a1c890c18..115681837e 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -166,18 +166,19 @@ func histogramRate(points []Point, isCounter bool) *histogram.FloatHistogram { prev := points[0].H // We already know that this is a histogram. last := points[len(points)-1].H if last == nil { - return nil // Last point in range is not a histogram. + return nil // Range contains a mix of histograms and floats. } - if last.Schema != prev.Schema || last.ZeroThreshold != prev.ZeroThreshold { - return nil // TODO(beorn7): Handle schema changes properly. + minSchema := prev.Schema + if last.Schema < minSchema { + minSchema = last.Schema } - h := last.Copy() - h.Sub(prev) - // We have to iterate through everything even in the non-counter case - // because we have to check that everything is a histogram. - // TODO(beorn7): Find a way to check that earlier, e.g. by handing in a - // []FloatPoint and a []HistogramPoint separately. - for _, currPoint := range points[1:] { + + // First iteration to find out two things: + // - What's the smallest relevant schema? + // - Are all data points histograms? + // TODO(beorn7): Find a way to check that earlier, e.g. by handing in a + // []FloatPoint and a []HistogramPoint separately. + for _, currPoint := range points[1 : len(points)-1] { curr := currPoint.H if curr == nil { return nil // Range contains a mix of histograms and floats. @@ -185,13 +186,23 @@ func histogramRate(points []Point, isCounter bool) *histogram.FloatHistogram { if !isCounter { continue } - if curr.Schema != prev.Schema || curr.ZeroThreshold != prev.ZeroThreshold { - return nil // TODO(beorn7): Handle schema changes properly. + if curr.Schema < minSchema { + minSchema = curr.Schema } - if curr.DetectReset(prev) { - h.Add(prev) + } + + h := last.CopyToSchema(minSchema) + h.Sub(prev) + + if isCounter { + // Second iteration to deal with counter resets. + for _, currPoint := range points[1:] { + curr := currPoint.H + if curr.DetectReset(prev) { + h.Add(prev) + } + prev = curr } - prev = curr } return h.Compact(3) }