histogram: Handle changes of the ZeroThreshold and the Schema

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2022-01-06 17:44:30 +01:00
parent ec80745884
commit 9fbcf14e5c
4 changed files with 954 additions and 150 deletions

View file

@ -73,6 +73,49 @@ func (h *FloatHistogram) Copy() *FloatHistogram {
return &c return &c
} }
// CopyToSchema works like Copy, but the returned deep copy has the provided
// target schema, which must be ≤ the original schema (i.e. it must have a lower
// resolution).
func (h *FloatHistogram) CopyToSchema(targetSchema int32) *FloatHistogram {
if targetSchema == h.Schema {
// Fast path.
return h.Copy()
}
if targetSchema > h.Schema {
panic(fmt.Errorf("cannot copy from schema %d to %d", h.Schema, targetSchema))
}
c := FloatHistogram{
Schema: targetSchema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.ZeroCount,
Count: h.Count,
Sum: h.Sum,
}
// TODO(beorn7): This is a straight-forward implementation using merging
// iterators for the original buckets and then adding one merged bucket
// after another to the newly created FloatHistogram. It's well possible
// that a more involved implementation performs much better, which we
// could do if this code path turns out to be performance-critical.
var iInSpan, index int32
for iSpan, iBucket, it := -1, -1, h.floatBucketIterator(true, 0, targetSchema); it.Next(); {
b := it.At()
c.PositiveSpans, c.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket(
b, c.PositiveSpans, c.PositiveBuckets, iSpan, iBucket, iInSpan, index,
)
index = b.Index
}
for iSpan, iBucket, it := -1, -1, h.floatBucketIterator(false, 0, targetSchema); it.Next(); {
b := it.At()
c.NegativeSpans, c.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket(
b, c.NegativeSpans, c.NegativeBuckets, iSpan, iBucket, iInSpan, index,
)
index = b.Index
}
return &c
}
// String returns a string representation of the Histogram. // String returns a string representation of the Histogram.
func (h *FloatHistogram) String() string { func (h *FloatHistogram) String() string {
var sb strings.Builder var sb strings.Builder
@ -140,29 +183,30 @@ func (h *FloatHistogram) Scale(factor float64) *FloatHistogram {
// resulting histogram might have buckets with a population of zero or directly // resulting histogram might have buckets with a population of zero or directly
// adjacent spans (offset=0). To normalize those, call the Compact method. // adjacent spans (offset=0). To normalize those, call the Compact method.
// //
// This method returns a pointer to the receiving histogram for convenience. // The method reconciles differences in the zero threshold and in the schema,
// but the schema of the other histogram must be ≥ the schema of the receiving
// histogram (i.e. must have an equal or higher resolution). This means that the
// schema of the receiving histogram won't change. Its zero threshold, however,
// will change if needed. The other histogram will not be modified in any case.
// //
// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the // This method returns a pointer to the receiving histogram for convenience.
// same in both histograms. Otherwise, its behavior is undefined.
// TODO(beorn7): Change that!
func (h *FloatHistogram) Add(other *FloatHistogram) *FloatHistogram { func (h *FloatHistogram) Add(other *FloatHistogram) *FloatHistogram {
h.ZeroCount += other.ZeroCount otherZeroCount := h.reconcileZeroBuckets(other)
h.ZeroCount += otherZeroCount
h.Count += other.Count h.Count += other.Count
h.Sum += other.Sum h.Sum += other.Sum
// TODO(beorn7): If needed, this can be optimized by inspecting the // TODO(beorn7): If needed, this can be optimized by inspecting the
// spans in other and create missing buckets in h in batches. // spans in other and create missing buckets in h in batches.
iSpan, iBucket := -1, -1
var iInSpan, index int32 var iInSpan, index int32
for it := other.PositiveBucketIterator(); it.Next(); { for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(true, h.ZeroThreshold, h.Schema); it.Next(); {
b := it.At() b := it.At()
h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket( h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket(
b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index, b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index,
) )
index = b.Index index = b.Index
} }
iSpan, iBucket = -1, -1 for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(false, h.ZeroThreshold, h.Schema); it.Next(); {
for it := other.NegativeBucketIterator(); it.Next(); {
b := it.At() b := it.At()
h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket( h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket(
b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index, b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index,
@ -173,20 +217,16 @@ func (h *FloatHistogram) Add(other *FloatHistogram) *FloatHistogram {
} }
// Sub works like Add but subtracts the other histogram. // Sub works like Add but subtracts the other histogram.
//
// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the
// same in both histograms. Otherwise, its behavior is undefined.
// TODO(beorn7): Change that!
func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram { func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram {
h.ZeroCount -= other.ZeroCount otherZeroCount := h.reconcileZeroBuckets(other)
h.ZeroCount -= otherZeroCount
h.Count -= other.Count h.Count -= other.Count
h.Sum -= other.Sum h.Sum -= other.Sum
// TODO(beorn7): If needed, this can be optimized by inspecting the // TODO(beorn7): If needed, this can be optimized by inspecting the
// spans in other and create missing buckets in h in batches. // spans in other and create missing buckets in h in batches.
iSpan, iBucket := -1, -1
var iInSpan, index int32 var iInSpan, index int32
for it := other.PositiveBucketIterator(); it.Next(); { for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(true, h.ZeroThreshold, h.Schema); it.Next(); {
b := it.At() b := it.At()
b.Count *= -1 b.Count *= -1
h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket( h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket(
@ -194,8 +234,7 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram {
) )
index = b.Index index = b.Index
} }
iSpan, iBucket = -1, -1 for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(false, h.ZeroThreshold, h.Schema); it.Next(); {
for it := other.NegativeBucketIterator(); it.Next(); {
b := it.At() b := it.At()
b.Count *= -1 b.Count *= -1
h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket( h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket(
@ -226,7 +265,7 @@ func addBucket(
buckets = append(buckets, 0) buckets = append(buckets, 0)
copy(buckets[1:], buckets) copy(buckets[1:], buckets)
buckets[0] = b.Count buckets[0] = b.Count
if spans[0].Offset == b.Index+1 { if len(spans) > 0 && spans[0].Offset == b.Index+1 {
spans[0].Length++ spans[0].Length++
spans[0].Offset-- spans[0].Offset--
return spans, buckets, 0, 0, 0 return spans, buckets, 0, 0, 0
@ -469,9 +508,24 @@ func compactBuckets(buckets []float64, spans []Span, maxEmptyBuckets int) ([]flo
// of observations, but NOT the sum of observations) is smaller in the receiving // of observations, but NOT the sum of observations) is smaller in the receiving
// histogram compared to the previous histogram. Otherwise, it returns false. // histogram compared to the previous histogram. Otherwise, it returns false.
// //
// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the // Special behavior in case the Schema or the ZeroThreshold are not the same in
// same in both histograms. Otherwise, its behavior is undefined. // both histograms:
// TODO(beorn7): Change that! //
// * A decrease of the ZeroThreshold or an increase of the Schema (i.e. an
// increase of resolution) can only happen together with a reset. Thus, the
// method returns true in either case.
//
// * Upon an increase of the ZeroThreshold, the buckets in the previous
// histogram that fall within the new ZeroThreshold are added to the ZeroCount
// of the previous histogram (without mutating the provided previous
// histogram). The scenario that a populated bucket of the previous histogram
// is partially within, partially outside of the new ZeroThreshold, can only
// happen together with a counter reset and therefore shortcuts to returning
// true.
//
// * Upon a decrease of the Schema, the buckets of the previous histogram are
// merged so that they match the new, lower-resolution schema (again without
// mutating the provided previous histogram).
// //
// Note that this kind of reset detection is quite expensive. Ideally, resets // Note that this kind of reset detection is quite expensive. Ideally, resets
// are detected at ingest time and stored in the TSDB, so that the reset // are detected at ingest time and stored in the TSDB, so that the reset
@ -481,16 +535,29 @@ func (h *FloatHistogram) DetectReset(previous *FloatHistogram) bool {
if h.Count < previous.Count { if h.Count < previous.Count {
return true return true
} }
if h.ZeroCount < previous.ZeroCount { if h.Schema > previous.Schema {
return true return true
} }
currIt := h.PositiveBucketIterator() if h.ZeroThreshold < previous.ZeroThreshold {
prevIt := previous.PositiveBucketIterator() // ZeroThreshold decreased.
return true
}
previousZeroCount, newThreshold := previous.zeroCountForLargerThreshold(h.ZeroThreshold)
if newThreshold != h.ZeroThreshold {
// ZeroThreshold is within a populated bucket in previous
// histogram.
return true
}
if h.ZeroCount < previousZeroCount {
return true
}
currIt := h.floatBucketIterator(true, h.ZeroThreshold, h.Schema)
prevIt := previous.floatBucketIterator(true, h.ZeroThreshold, h.Schema)
if detectReset(currIt, prevIt) { if detectReset(currIt, prevIt) {
return true return true
} }
currIt = h.NegativeBucketIterator() currIt = h.floatBucketIterator(false, h.ZeroThreshold, h.Schema)
prevIt = previous.NegativeBucketIterator() prevIt = previous.floatBucketIterator(false, h.ZeroThreshold, h.Schema)
return detectReset(currIt, prevIt) return detectReset(currIt, prevIt)
} }
@ -558,35 +625,40 @@ func detectReset(currIt, prevIt FloatBucketIterator) bool {
// positive buckets in ascending order (starting next to the zero bucket and // positive buckets in ascending order (starting next to the zero bucket and
// going up). // going up).
func (h *FloatHistogram) PositiveBucketIterator() FloatBucketIterator { func (h *FloatHistogram) PositiveBucketIterator() FloatBucketIterator {
return newFloatBucketIterator(h, true) return h.floatBucketIterator(true, 0, h.Schema)
} }
// NegativeBucketIterator returns a FloatBucketIterator to iterate over all // NegativeBucketIterator returns a FloatBucketIterator to iterate over all
// negative buckets in descending order (starting next to the zero bucket and // negative buckets in descending order (starting next to the zero bucket and
// going down). // going down).
func (h *FloatHistogram) NegativeBucketIterator() FloatBucketIterator { func (h *FloatHistogram) NegativeBucketIterator() FloatBucketIterator {
return newFloatBucketIterator(h, false) return h.floatBucketIterator(false, 0, h.Schema)
} }
// PositiveReverseBucketIterator returns a FloatBucketIterator to iterate over all // PositiveReverseBucketIterator returns a FloatBucketIterator to iterate over all
// positive buckets in descending order (starting at the highest bucket and going // positive buckets in descending order (starting at the highest bucket and going
// down towards the zero bucket). // down towards the zero bucket).
func (h *FloatHistogram) PositiveReverseBucketIterator() FloatBucketIterator { func (h *FloatHistogram) PositiveReverseBucketIterator() FloatBucketIterator {
return newReverseFloatBucketIterator(h, true) return h.reverseFloatBucketIterator(true)
} }
// NegativeReverseBucketIterator returns a FloatBucketIterator to iterate over all // NegativeReverseBucketIterator returns a FloatBucketIterator to iterate over all
// negative buckets in ascending order (starting at the lowest bucket and going up // negative buckets in ascending order (starting at the lowest bucket and going up
// towards the zero bucket). // towards the zero bucket).
func (h *FloatHistogram) NegativeReverseBucketIterator() FloatBucketIterator { func (h *FloatHistogram) NegativeReverseBucketIterator() FloatBucketIterator {
return newReverseFloatBucketIterator(h, false) return h.reverseFloatBucketIterator(false)
} }
// AllBucketIterator returns a FloatBucketIterator to iterate over all negative, // AllBucketIterator returns a FloatBucketIterator to iterate over all negative,
// zero, and positive buckets in ascending order (starting at the lowest bucket // zero, and positive buckets in ascending order (starting at the lowest bucket
// and going up). // and going up).
func (h *FloatHistogram) AllBucketIterator() FloatBucketIterator { func (h *FloatHistogram) AllBucketIterator() FloatBucketIterator {
return newAllFloatBucketIterator(h) return &allFloatBucketIterator{
h: h,
negIter: h.NegativeReverseBucketIterator(),
posIter: h.PositiveBucketIterator(),
state: -1,
}
} }
// CumulativeBucketIterator returns a FloatBucketIterator to iterate over a // CumulativeBucketIterator returns a FloatBucketIterator to iterate over a
@ -600,6 +672,117 @@ func (h *FloatHistogram) CumulativeBucketIterator() FloatBucketIterator {
return &cumulativeFloatBucketIterator{h: h, posSpansIdx: -1} return &cumulativeFloatBucketIterator{h: h, posSpansIdx: -1}
} }
// zeroCountForLargerThreshold returns what the histogram's zero count would be
// if the ZeroThreshold had the provided larger (or equal) value. If the
// provided value is less than the histogram's ZeroThreshold, the method panics.
// If the largerThreshold ends up within a populated bucket of the histogram, it
// is adjusted upwards to the lower limit of that bucket (all in terms of
// absolute values) and that bucket's count is included in the returned
// count. The adjusted threshold is returned, too.
func (h *FloatHistogram) zeroCountForLargerThreshold(largerThreshold float64) (count, threshold float64) {
// Fast path.
if largerThreshold == h.ZeroThreshold {
return h.ZeroCount, largerThreshold
}
if largerThreshold < h.ZeroThreshold {
panic(fmt.Errorf("new threshold %f is less than old threshold %f", largerThreshold, h.ZeroThreshold))
}
outer:
for {
count = h.ZeroCount
i := h.PositiveBucketIterator()
for i.Next() {
b := i.At()
if b.Lower >= largerThreshold {
break
}
count += b.Count // Bucket to be merged into zero bucket.
if b.Upper > largerThreshold {
// New threshold ended up within a bucket. if it's
// populated, we need to adjust largerThreshold before
// we are done here.
if b.Count != 0 {
largerThreshold = b.Upper
}
break
}
}
i = h.NegativeBucketIterator()
for i.Next() {
b := i.At()
if b.Upper <= -largerThreshold {
break
}
count += b.Count // Bucket to be merged into zero bucket.
if b.Lower < -largerThreshold {
// New threshold ended up within a bucket. If
// it's populated, we need to adjust
// largerThreshold and have to redo the whole
// thing because the treatment of the positive
// buckets is invalid now.
if b.Count != 0 {
largerThreshold = -b.Lower
continue outer
}
break
}
}
return count, largerThreshold
}
}
// trimBucketsInZeroBucket removes all buckets that are within the zero
// bucket. It assumes that the zero threshold is at a bucket boundary and that
// the counts in the buckets to remove are already part of the zero count.
func (h *FloatHistogram) trimBucketsInZeroBucket() {
i := h.PositiveBucketIterator()
bucketsIdx := 0
for i.Next() {
b := i.At()
if b.Lower >= h.ZeroThreshold {
break
}
h.PositiveBuckets[bucketsIdx] = 0
bucketsIdx++
}
i = h.NegativeBucketIterator()
bucketsIdx = 0
for i.Next() {
b := i.At()
if b.Upper <= -h.ZeroThreshold {
break
}
h.NegativeBuckets[bucketsIdx] = 0
bucketsIdx++
}
// We are abusing Compact to trim the buckets set to zero
// above. Premature compacting could cause additional cost, but this
// code path is probably rarely used anyway.
h.Compact(3)
}
// reconcileZeroBuckets finds a zero bucket large enough to include the zero
// buckets of both histograms (the receiving histogram and the other histogram)
// with a zero threshold that is not within a populated bucket in either
// histogram. This method modifies the receiving histogram accourdingly, but
// leaves the other histogram as is. Instead, it returns the zero count the
// other histogram would have if it were modified.
func (h *FloatHistogram) reconcileZeroBuckets(other *FloatHistogram) float64 {
otherZeroCount := other.ZeroCount
otherZeroThreshold := other.ZeroThreshold
for otherZeroThreshold != h.ZeroThreshold {
if h.ZeroThreshold > otherZeroThreshold {
otherZeroCount, otherZeroThreshold = other.zeroCountForLargerThreshold(h.ZeroThreshold)
}
if otherZeroThreshold > h.ZeroThreshold {
h.ZeroCount, h.ZeroThreshold = h.zeroCountForLargerThreshold(otherZeroThreshold)
h.trimBucketsInZeroBucket()
}
}
return otherZeroCount
}
// FloatBucketIterator iterates over the buckets of a FloatHistogram, returning // FloatBucketIterator iterates over the buckets of a FloatHistogram, returning
// decoded buckets. // decoded buckets.
type FloatBucketIterator interface { type FloatBucketIterator interface {
@ -646,100 +829,43 @@ func (b FloatBucket) String() string {
return sb.String() return sb.String()
} }
type floatBucketIterator struct { // floatBucketIterator is a low-level constructor for bucket iterators.
schema int32 //
spans []Span // If positive is true, the returned iterator iterates through the positive
buckets []float64 // buckets, otherwise through the negative buckets.
//
positive bool // Whether this is for positive buckets. // If absoluteStartValue is < the lowest absolute value of any upper bucket
// boundary, the iterator starts with the first bucket. Otherwise, it will skip
spansIdx int // Current span within spans slice. // all buckets with an absolute value of their upper boundary ≤
idxInSpan uint32 // Index in the current span. 0 <= idxInSpan < span.Length. // absoluteStartValue.
bucketsIdx int // Current bucket within buckets slice. //
// targetSchema must be ≤ the schema of FloatHistogram (and of course within the
currCount float64 // Count in the current bucket. // legal values for schemas in general). The buckets are merged to match the
currIdx int32 // The actual bucket index. // targetSchema prior to iterating (without mutating FloatHistogram).
currLower, currUpper float64 // Limits of the current bucket. func (h *FloatHistogram) floatBucketIterator(
positive bool, absoluteStartValue float64, targetSchema int32,
) *floatBucketIterator {
if targetSchema > h.Schema {
panic(fmt.Errorf("cannot merge from schema %d to %d", h.Schema, targetSchema))
}
i := &floatBucketIterator{
schema: h.Schema,
targetSchema: targetSchema,
positive: positive,
absoluteStartValue: absoluteStartValue,
} }
func newFloatBucketIterator(h *FloatHistogram, positive bool) *floatBucketIterator {
r := &floatBucketIterator{schema: h.Schema, positive: positive}
if positive { if positive {
r.spans = h.PositiveSpans i.spans = h.PositiveSpans
r.buckets = h.PositiveBuckets i.buckets = h.PositiveBuckets
} else { } else {
r.spans = h.NegativeSpans i.spans = h.NegativeSpans
r.buckets = h.NegativeBuckets i.buckets = h.NegativeBuckets
} }
return r return i
} }
func (r *floatBucketIterator) Next() bool { // reverseFloatbucketiterator is a low-level constructor for reverse bucket iterators.
if r.spansIdx >= len(r.spans) { func (h *FloatHistogram) reverseFloatBucketIterator(positive bool) *reverseFloatBucketIterator {
return false
}
span := r.spans[r.spansIdx]
// Seed currIdx for the first bucket.
if r.bucketsIdx == 0 {
r.currIdx = span.Offset
} else {
r.currIdx++
}
for r.idxInSpan >= span.Length {
// We have exhausted the current span and have to find a new
// one. We'll even handle pathologic spans of length 0.
r.idxInSpan = 0
r.spansIdx++
if r.spansIdx >= len(r.spans) {
return false
}
span = r.spans[r.spansIdx]
r.currIdx += span.Offset
}
r.currCount = r.buckets[r.bucketsIdx]
if r.positive {
r.currUpper = getBound(r.currIdx, r.schema)
r.currLower = getBound(r.currIdx-1, r.schema)
} else {
r.currLower = -getBound(r.currIdx, r.schema)
r.currUpper = -getBound(r.currIdx-1, r.schema)
}
r.idxInSpan++
r.bucketsIdx++
return true
}
func (r *floatBucketIterator) At() FloatBucket {
return FloatBucket{
Count: r.currCount,
Lower: r.currLower,
Upper: r.currUpper,
LowerInclusive: r.currLower < 0,
UpperInclusive: r.currUpper > 0,
Index: r.currIdx,
}
}
type reverseFloatBucketIterator struct {
schema int32
spans []Span
buckets []float64
positive bool // Whether this is for positive buckets.
spansIdx int // Current span within spans slice.
idxInSpan int32 // Index in the current span. 0 <= idxInSpan < span.Length.
bucketsIdx int // Current bucket within buckets slice.
currCount float64 // Count in the current bucket.
currIdx int32 // The actual bucket index.
currLower, currUpper float64 // Limits of the current bucket.
}
func newReverseFloatBucketIterator(h *FloatHistogram, positive bool) *reverseFloatBucketIterator {
r := &reverseFloatBucketIterator{schema: h.Schema, positive: positive} r := &reverseFloatBucketIterator{schema: h.Schema, positive: positive}
if positive { if positive {
r.spans = h.PositiveSpans r.spans = h.PositiveSpans
@ -762,6 +888,130 @@ func newReverseFloatBucketIterator(h *FloatHistogram, positive bool) *reverseFlo
return r return r
} }
type floatBucketIterator struct {
// targetSchema is the schema to merge to and must be ≤ schema.
schema, targetSchema int32
spans []Span
buckets []float64
positive bool // Whether this is for positive buckets.
spansIdx int // Current span within spans slice.
idxInSpan uint32 // Index in the current span. 0 <= idxInSpan < span.Length.
bucketsIdx int // Current bucket within buckets slice.
currCount float64 // Count in the current bucket.
currIdx int32 // The bucket index within the targetSchema.
origIdx int32 // The bucket index within the original schema.
absoluteStartValue float64 // Never return buckets with an upper bound ≤ this value.
}
func (i *floatBucketIterator) Next() bool {
if i.spansIdx >= len(i.spans) {
return false
}
// Copy all of these into local variables so that we can forward to the
// next bucket and then roll back if needed.
origIdx, spansIdx, idxInSpan := i.origIdx, i.spansIdx, i.idxInSpan
span := i.spans[spansIdx]
firstPass := true
i.currCount = 0
mergeLoop: // Merge together all buckets from the original schema that fall into one bucket in the targetSchema.
for {
if i.bucketsIdx == 0 {
// Seed origIdx for the first bucket.
origIdx = span.Offset
} else {
origIdx++
}
for idxInSpan >= span.Length {
// We have exhausted the current span and have to find a new
// one. We even handle pathologic spans of length 0 here.
idxInSpan = 0
spansIdx++
if spansIdx >= len(i.spans) {
if firstPass {
return false
}
break mergeLoop
}
span = i.spans[spansIdx]
origIdx += span.Offset
}
currIdx := i.targetIdx(origIdx)
if firstPass {
i.currIdx = currIdx
firstPass = false
} else if currIdx != i.currIdx {
// Reached next bucket in targetSchema.
// Do not actually forward to the next bucket, but break out.
break mergeLoop
}
i.currCount += i.buckets[i.bucketsIdx]
idxInSpan++
i.bucketsIdx++
i.origIdx, i.spansIdx, i.idxInSpan = origIdx, spansIdx, idxInSpan
if i.schema == i.targetSchema {
// Don't need to test the next bucket for mergeability
// if we have no schema change anyway.
break mergeLoop
}
}
// Skip buckets before absoluteStartValue.
// TODO(beorn7): Maybe do something more efficient than this recursive call.
if getBound(i.currIdx, i.targetSchema) <= i.absoluteStartValue {
return i.Next()
}
return true
}
func (i *floatBucketIterator) At() FloatBucket {
b := FloatBucket{
Count: i.currCount,
Index: i.currIdx,
}
if i.positive {
b.Upper = getBound(i.currIdx, i.targetSchema)
b.Lower = getBound(i.currIdx-1, i.targetSchema)
} else {
b.Lower = -getBound(i.currIdx, i.targetSchema)
b.Upper = -getBound(i.currIdx-1, i.targetSchema)
}
b.LowerInclusive = b.Lower < 0
b.UpperInclusive = b.Upper > 0
return b
}
// targetIdx returns the bucket index within i.targetSchema for the given bucket
// index within i.schema.
func (i *floatBucketIterator) targetIdx(idx int32) int32 {
if i.schema == i.targetSchema {
// Fast path for the common case. The below would yield the same
// result, just with more effort.
return idx
}
return ((idx - 1) >> (i.schema - i.targetSchema)) + 1
}
type reverseFloatBucketIterator struct {
schema int32
spans []Span
buckets []float64
positive bool // Whether this is for positive buckets.
spansIdx int // Current span within spans slice.
idxInSpan int32 // Index in the current span. 0 <= idxInSpan < span.Length.
bucketsIdx int // Current bucket within buckets slice.
currCount float64 // Count in the current bucket.
currIdx int32 // The actual bucket index.
currLower, currUpper float64 // Limits of the current bucket.
}
func (r *reverseFloatBucketIterator) Next() bool { func (r *reverseFloatBucketIterator) Next() bool {
r.currIdx-- r.currIdx--
if r.bucketsIdx < 0 { if r.bucketsIdx < 0 {
@ -812,15 +1062,6 @@ type allFloatBucketIterator struct {
currBucket FloatBucket currBucket FloatBucket
} }
func newAllFloatBucketIterator(h *FloatHistogram) *allFloatBucketIterator {
return &allFloatBucketIterator{
h: h,
negIter: h.NegativeReverseBucketIterator(),
posIter: h.PositiveBucketIterator(),
state: -1,
}
}
func (r *allFloatBucketIterator) Next() bool { func (r *allFloatBucketIterator) Next() bool {
switch r.state { switch r.state {
case -1: case -1:

View file

@ -479,6 +479,206 @@ func TestFloatHistogramDetectReset(t *testing.T) {
}, },
true, true,
}, },
{
"zero threshold decreases",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.009,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
true,
},
{
"zero threshold increases without touching any existing buckets",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.011,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"zero threshold increases enough to cover existing buckets",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 1,
ZeroCount: 7.73,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{1, 3}},
PositiveBuckets: []float64{3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"zero threshold increases into the middle of an existing buckets",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.3,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
true,
},
{
"schema increases without any other changes",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
Schema: 0,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
Schema: 1,
PositiveSpans: []Span{{-5, 4}, {2, 6}},
PositiveBuckets: []float64{0.4, 0.6, 1, 0.23, 2, 1.3, 1.2, 3, 0.05, 0.05},
NegativeSpans: []Span{{5, 4}, {6, 4}},
NegativeBuckets: []float64{2, 1.1, 2, 1, 0.234e5, 1e5, 500, 500},
},
true,
},
{
"schema decreases without any other changes",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
Schema: 1,
PositiveSpans: []Span{{-5, 4}, {2, 6}},
PositiveBuckets: []float64{0.4, 0.6, 1, 0.23, 2, 1.3, 1.2, 3, 0.05, 0.05},
NegativeSpans: []Span{{5, 4}, {6, 4}},
NegativeBuckets: []float64{2, 1.1, 2, 1, 0.234e5, 1e5, 500, 500},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
Schema: 0,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"schema decreases and a bucket goes up",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
Schema: 1,
PositiveSpans: []Span{{-5, 4}, {2, 6}},
PositiveBuckets: []float64{0.4, 0.6, 1, 0.23, 2, 1.3, 1.2, 3, 0.05, 0.05},
NegativeSpans: []Span{{5, 4}, {6, 4}},
NegativeBuckets: []float64{2, 1.1, 2, 1, 0.234e5, 1e5, 500, 500},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
Schema: 0,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 4.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"schema decreases and a bucket goes down",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
Schema: 1,
PositiveSpans: []Span{{-5, 4}, {2, 6}},
PositiveBuckets: []float64{0.4, 0.6, 1, 0.23, 2, 1.3, 1.2, 3, 0.05, 0.05},
NegativeSpans: []Span{{5, 4}, {6, 4}},
NegativeBuckets: []float64{2, 1.1, 2, 1, 0.234e5, 1e5, 500, 500},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
Schema: 0,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 2.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
true,
},
} }
for _, c := range cases { for _, c := range cases {
@ -774,7 +974,7 @@ func TestFloatHistogramAdd(t *testing.T) {
{ {
"non-overlapping spans", "non-overlapping spans",
&FloatHistogram{ &FloatHistogram{
ZeroThreshold: 0.01, ZeroThreshold: 0.001,
ZeroCount: 11, ZeroCount: 11,
Count: 30, Count: 30,
Sum: 2.345, Sum: 2.345,
@ -784,7 +984,7 @@ func TestFloatHistogramAdd(t *testing.T) {
NegativeBuckets: []float64{3, 1, 5, 6}, NegativeBuckets: []float64{3, 1, 5, 6},
}, },
&FloatHistogram{ &FloatHistogram{
ZeroThreshold: 0.01, ZeroThreshold: 0.001,
ZeroCount: 8, ZeroCount: 8,
Count: 21, Count: 21,
Sum: 1.234, Sum: 1.234,
@ -794,7 +994,7 @@ func TestFloatHistogramAdd(t *testing.T) {
NegativeBuckets: []float64{1, 1, 4, 4}, NegativeBuckets: []float64{1, 1, 4, 4},
}, },
&FloatHistogram{ &FloatHistogram{
ZeroThreshold: 0.01, ZeroThreshold: 0.001,
ZeroCount: 19, ZeroCount: 19,
Count: 51, Count: 51,
Sum: 3.579, Sum: 3.579,
@ -903,6 +1103,243 @@ func TestFloatHistogramAdd(t *testing.T) {
NegativeBuckets: []float64{3, 2, 1, 4, 9, 6}, NegativeBuckets: []float64{3, 2, 1, 4, 9, 6},
}, },
}, },
{
"schema change",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 8,
Count: 21,
Sum: 1.234,
Schema: 0,
PositiveSpans: []Span{{-1, 4}, {0, 3}},
PositiveBuckets: []float64{5, 4, 2, 3, 6, 2, 5},
NegativeSpans: []Span{{4, 2}, {1, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
Schema: 1,
PositiveSpans: []Span{{-4, 3}, {5, 5}},
PositiveBuckets: []float64{1, 0, 0, 3, 2, 2, 3, 4},
NegativeSpans: []Span{{6, 3}, {6, 4}},
NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 19,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{-2, 5}, {0, 3}},
PositiveBuckets: []float64{1, 5, 4, 2, 6, 10, 9, 5},
NegativeSpans: []Span{{3, 3}, {1, 3}},
NegativeBuckets: []float64{3, 2, 1, 4, 9, 6},
},
},
{
"larger zero bucket in first histogram",
&FloatHistogram{
ZeroThreshold: 1,
ZeroCount: 17,
Count: 21,
Sum: 1.234,
PositiveSpans: []Span{{1, 2}, {0, 3}},
PositiveBuckets: []float64{2, 3, 6, 2, 5},
NegativeSpans: []Span{{4, 2}, {1, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
PositiveSpans: []Span{{-2, 2}, {2, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 1,
ZeroCount: 29,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{1, 2}, {0, 3}},
PositiveBuckets: []float64{2, 6, 10, 9, 5},
NegativeSpans: []Span{{3, 3}, {1, 3}},
NegativeBuckets: []float64{3, 2, 1, 4, 9, 6},
},
},
{
"larger zero bucket in second histogram",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
PositiveSpans: []Span{{-2, 2}, {2, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 1,
ZeroCount: 17,
Count: 21,
Sum: 1.234,
PositiveSpans: []Span{{1, 2}, {0, 3}},
PositiveBuckets: []float64{2, 3, 6, 2, 5},
NegativeSpans: []Span{{4, 2}, {1, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 1,
ZeroCount: 29,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{1, 5}},
PositiveBuckets: []float64{2, 6, 10, 9, 5},
NegativeSpans: []Span{{3, 7}},
NegativeBuckets: []float64{3, 2, 1, 0, 4, 9, 6},
},
},
{
"larger zero threshold in first histogram ends up inside a populated bucket of second histogram",
&FloatHistogram{
ZeroThreshold: 0.2,
ZeroCount: 17,
Count: 21,
Sum: 1.234,
PositiveSpans: []Span{{1, 2}, {0, 3}},
PositiveBuckets: []float64{2, 3, 6, 2, 5},
NegativeSpans: []Span{{4, 2}, {1, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
PositiveSpans: []Span{{-2, 2}, {2, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 0.25,
ZeroCount: 29,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{-1, 1}, {1, 5}},
PositiveBuckets: []float64{0, 2, 6, 10, 9, 5},
NegativeSpans: []Span{{3, 7}},
NegativeBuckets: []float64{3, 2, 1, 0, 4, 9, 6},
},
},
{
"larger zero threshold in second histogram ends up inside a populated bucket of first histogram",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
PositiveSpans: []Span{{-2, 2}, {2, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 0.2,
ZeroCount: 17,
Count: 21,
Sum: 1.234,
PositiveSpans: []Span{{1, 2}, {0, 3}},
PositiveBuckets: []float64{2, 3, 6, 2, 5},
NegativeSpans: []Span{{4, 2}, {1, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.25,
ZeroCount: 29,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{1, 5}},
PositiveBuckets: []float64{2, 6, 10, 9, 5},
NegativeSpans: []Span{{3, 7}},
NegativeBuckets: []float64{3, 2, 1, 0, 4, 9, 6},
},
},
{
"schema change combined with larger zero bucket in second histogram",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 8,
Count: 21,
Sum: 1.234,
Schema: 0,
PositiveSpans: []Span{{-2, 5}, {0, 3}},
PositiveBuckets: []float64{2, 5, 4, 2, 3, 6, 2, 5},
NegativeSpans: []Span{{4, 2}, {1, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.25,
ZeroCount: 12,
Count: 30,
Sum: 2.345,
Schema: 1,
PositiveSpans: []Span{{-3, 2}, {5, 5}},
PositiveBuckets: []float64{1, 0, 3, 2, 2, 3, 4},
NegativeSpans: []Span{{6, 3}, {6, 4}},
NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4},
},
&FloatHistogram{
ZeroThreshold: 0.25,
ZeroCount: 22,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{-1, 7}},
PositiveBuckets: []float64{6, 4, 2, 6, 10, 9, 5},
NegativeSpans: []Span{{3, 7}},
NegativeBuckets: []float64{3, 2, 1, 0, 4, 9, 6},
},
},
{
"schema change combined with larger zero bucket in first histogram",
&FloatHistogram{
ZeroThreshold: 0.25,
ZeroCount: 8,
Count: 21,
Sum: 1.234,
Schema: 0,
PositiveSpans: []Span{{-1, 4}, {0, 3}},
PositiveBuckets: []float64{5, 4, 2, 3, 6, 2, 5},
NegativeSpans: []Span{{4, 2}, {1, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
Schema: 1,
PositiveSpans: []Span{{-4, 3}, {5, 5}},
PositiveBuckets: []float64{1, 0, 0, 3, 2, 2, 3, 4},
NegativeSpans: []Span{{6, 3}, {6, 4}},
NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4},
},
&FloatHistogram{
ZeroThreshold: 0.25,
ZeroCount: 20,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{-1, 4}, {0, 3}},
PositiveBuckets: []float64{5, 4, 2, 6, 10, 9, 5},
NegativeSpans: []Span{{3, 3}, {1, 3}},
NegativeBuckets: []float64{3, 2, 1, 4, 9, 6},
},
},
} }
for _, c := range cases { for _, c := range cases {
@ -954,6 +1391,41 @@ func TestFloatHistogramSub(t *testing.T) {
NegativeBuckets: []float64{2, 0, 1, 2}, NegativeBuckets: []float64{2, 0, 1, 2},
}, },
}, },
{
"schema change",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 8,
Count: 59,
Sum: 1.234,
Schema: 0,
PositiveSpans: []Span{{-2, 5}, {0, 3}},
PositiveBuckets: []float64{2, 5, 4, 2, 3, 6, 7, 5},
NegativeSpans: []Span{{3, 3}, {1, 3}},
NegativeBuckets: []float64{4, 10, 1, 4, 14, 7},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 2,
Count: 19,
Sum: 0.345,
Schema: 1,
PositiveSpans: []Span{{-4, 3}, {5, 5}},
PositiveBuckets: []float64{1, 0, 0, 1, 2, 2, 3, 4},
NegativeSpans: []Span{{6, 3}, {6, 4}},
NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 6,
Count: 40,
Sum: 0.889,
PositiveSpans: []Span{{-2, 5}, {0, 3}},
PositiveBuckets: []float64{1, 5, 4, 2, 2, 2, 0, 5},
NegativeSpans: []Span{{3, 3}, {1, 3}},
NegativeBuckets: []float64{1, 9, 1, 4, 9, 1},
},
},
} }
for _, c := range cases { for _, c := range cases {
@ -965,6 +1437,73 @@ func TestFloatHistogramSub(t *testing.T) {
} }
} }
func TestFloatHistogramCopyToSchema(t *testing.T) {
cases := []struct {
name string
targetSchema int32
in, expected *FloatHistogram
}{
{
"no schema change",
1,
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
Schema: 1,
PositiveSpans: []Span{{-4, 3}, {5, 5}},
PositiveBuckets: []float64{1, 0, 0, 3, 2, 2, 3, 4},
NegativeSpans: []Span{{6, 3}, {6, 4}},
NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
Schema: 1,
PositiveSpans: []Span{{-4, 3}, {5, 5}},
PositiveBuckets: []float64{1, 0, 0, 3, 2, 2, 3, 4},
NegativeSpans: []Span{{6, 3}, {6, 4}},
NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4},
},
},
{
"schema change",
0,
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
Schema: 1,
PositiveSpans: []Span{{-4, 3}, {5, 5}},
PositiveBuckets: []float64{1, 0, 0, 3, 2, 2, 3, 4},
NegativeSpans: []Span{{6, 3}, {6, 4}},
NegativeBuckets: []float64{3, 0.5, 0.5, 2, 3, 2, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
Schema: 0,
PositiveSpans: []Span{{-2, 2}, {2, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require.Equal(t, c.expected, c.in.CopyToSchema(c.targetSchema))
})
}
}
func TestReverseFloatBucketIterator(t *testing.T) { func TestReverseFloatBucketIterator(t *testing.T) {
h := &FloatHistogram{ h := &FloatHistogram{
Count: 405, Count: 405,

View file

@ -2157,8 +2157,13 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram
switch op { switch op {
case parser.ADD: case parser.ADD:
if hlhs != nil && hrhs != nil { if hlhs != nil && hrhs != nil {
// The histogram being added must have the larger schema
// code (i.e. the higher resolution).
if hrhs.Schema >= hlhs.Schema {
return 0, hlhs.Copy().Add(hrhs), true return 0, hlhs.Copy().Add(hrhs), true
} }
return 0, hrhs.Copy().Add(hlhs), true
}
return lhs + rhs, nil, true return lhs + rhs, nil, true
case parser.SUB: case parser.SUB:
return lhs - rhs, nil, true return lhs - rhs, nil, true
@ -2322,7 +2327,15 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
if s.H != nil { if s.H != nil {
group.hasHistogram = true group.hasHistogram = true
if group.histogramValue != nil { if group.histogramValue != nil {
// The histogram being added must have
// an equal or larger schema.
if s.H.Schema >= group.histogramValue.Schema {
group.histogramValue.Add(s.H) group.histogramValue.Add(s.H)
} else {
h := s.H.Copy()
h.Add(group.histogramValue)
group.histogramValue = h
}
} }
// Otherwise the aggregation contained floats // Otherwise the aggregation contained floats
// previously and will be invalid anyway. No // previously and will be invalid anyway. No

View file

@ -166,18 +166,19 @@ func histogramRate(points []Point, isCounter bool) *histogram.FloatHistogram {
prev := points[0].H // We already know that this is a histogram. prev := points[0].H // We already know that this is a histogram.
last := points[len(points)-1].H last := points[len(points)-1].H
if last == nil { if last == nil {
return nil // Last point in range is not a histogram. return nil // Range contains a mix of histograms and floats.
} }
if last.Schema != prev.Schema || last.ZeroThreshold != prev.ZeroThreshold { minSchema := prev.Schema
return nil // TODO(beorn7): Handle schema changes properly. if last.Schema < minSchema {
minSchema = last.Schema
} }
h := last.Copy()
h.Sub(prev) // First iteration to find out two things:
// We have to iterate through everything even in the non-counter case // - What's the smallest relevant schema?
// because we have to check that everything is a histogram. // - Are all data points histograms?
// TODO(beorn7): Find a way to check that earlier, e.g. by handing in a // TODO(beorn7): Find a way to check that earlier, e.g. by handing in a
// []FloatPoint and a []HistogramPoint separately. // []FloatPoint and a []HistogramPoint separately.
for _, currPoint := range points[1:] { for _, currPoint := range points[1 : len(points)-1] {
curr := currPoint.H curr := currPoint.H
if curr == nil { if curr == nil {
return nil // Range contains a mix of histograms and floats. return nil // Range contains a mix of histograms and floats.
@ -185,14 +186,24 @@ func histogramRate(points []Point, isCounter bool) *histogram.FloatHistogram {
if !isCounter { if !isCounter {
continue continue
} }
if curr.Schema != prev.Schema || curr.ZeroThreshold != prev.ZeroThreshold { if curr.Schema < minSchema {
return nil // TODO(beorn7): Handle schema changes properly. minSchema = curr.Schema
} }
}
h := last.CopyToSchema(minSchema)
h.Sub(prev)
if isCounter {
// Second iteration to deal with counter resets.
for _, currPoint := range points[1:] {
curr := currPoint.H
if curr.DetectReset(prev) { if curr.DetectReset(prev) {
h.Add(prev) h.Add(prev)
} }
prev = curr prev = curr
} }
}
return h.Compact(3) return h.Compact(3)
} }