From 6a820a646cf43e65ec9449cacf3b0bc003e167c8 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 23 Nov 2021 19:40:49 +0100 Subject: [PATCH] histogram: Add FloatHistogram Including a few adjustments for normal Histogram, too, e.g. use pointer receiver to avoid the large copy on method calls. Signed-off-by: beorn7 --- model/histogram/float_histogram.go | 341 +++++++++++++++++++++++++++++ model/histogram/histogram.go | 74 +++++-- model/histogram/histogram_test.go | 25 +++ 3 files changed, 426 insertions(+), 14 deletions(-) create mode 100644 model/histogram/float_histogram.go diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go new file mode 100644 index 0000000000..954afc5948 --- /dev/null +++ b/model/histogram/float_histogram.go @@ -0,0 +1,341 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package histogram + +import ( + "fmt" + "math" + "strings" +) + +// FloatHistogram is similar to Histogram but uses float64 for all +// counts. Additionally, bucket counts are absolute and not deltas. +// +// A FloatHistogram is needed by PromQL to handle operations that might result +// in fractional counts. Since the counts in a histogram are unlikely to be too +// large to be represented precisely by a float64, a FloatHistogram can also be +// used to represent a histogram with integer counts and thus serves as a more +// generalized representation. +type FloatHistogram struct { + // Currently valid schema numbers are -4 <= n <= 8. They are all for + // base-2 bucket schemas, where 1 is a bucket boundary in each case, and + // then each power of two is divided into 2^n logarithmic buckets. Or + // in other words, each bucket boundary is the previous boundary times + // 2^(2^-n). + Schema int32 + // Width of the zero bucket. + ZeroThreshold float64 + // Observations falling into the zero bucket. Must be zero or positive. + ZeroCount float64 + // Total number of observations. Must be zero or positive. + Count float64 + // Sum of observations. This is also used as the stale marker. + Sum float64 + // Spans for positive and negative buckets (see Span below). + PositiveSpans, NegativeSpans []Span + // Observation counts in buckets. Each represents an absolute count and + // must be zero or positive. + PositiveBuckets, NegativeBuckets []float64 +} + +// Copy returns a deep copy of the Histogram. +func (h *FloatHistogram) Copy() *FloatHistogram { + c := *h + + if h.PositiveSpans != nil { + c.PositiveSpans = make([]Span, len(h.PositiveSpans)) + copy(c.PositiveSpans, h.PositiveSpans) + } + if h.NegativeSpans != nil { + c.NegativeSpans = make([]Span, len(h.NegativeSpans)) + copy(c.NegativeSpans, h.NegativeSpans) + } + if h.PositiveBuckets != nil { + c.PositiveBuckets = make([]float64, len(h.PositiveBuckets)) + copy(c.PositiveBuckets, h.PositiveBuckets) + } + if h.NegativeBuckets != nil { + c.NegativeBuckets = make([]float64, len(h.NegativeBuckets)) + copy(c.NegativeBuckets, h.NegativeBuckets) + } + + return &c +} + +// String returns a string representation of the Histogram. +func (h *FloatHistogram) String() string { + var sb strings.Builder + fmt.Fprintf(&sb, "{count:%g, sum:%g", h.Count, h.Sum) + + var nBuckets []FloatBucket + for it := h.NegativeBucketIterator(); it.Next(); { + bucket := it.At() + if bucket.Count != 0 { + nBuckets = append(nBuckets, it.At()) + } + } + for i := len(nBuckets) - 1; i >= 0; i-- { + fmt.Fprintf(&sb, ", %s", nBuckets[i].String()) + } + + if h.ZeroCount != 0 { + fmt.Fprintf(&sb, ", %s", h.ZeroBucket().String()) + } + + for it := h.PositiveBucketIterator(); it.Next(); { + bucket := it.At() + if bucket.Count != 0 { + fmt.Fprintf(&sb, ", %s", bucket.String()) + } + } + + sb.WriteRune('}') + return sb.String() +} + +// ZeroBucket returns the zero bucket. +func (h *FloatHistogram) ZeroBucket() FloatBucket { + return FloatBucket{ + Lower: -h.ZeroThreshold, + Upper: h.ZeroThreshold, + LowerInclusive: true, + UpperInclusive: true, + Count: h.ZeroCount, + } +} + +// PositiveBucketIterator returns a FloatBucketIterator to iterate over all +// positive buckets in ascending order (starting next to the zero bucket and +// going up). +func (h *FloatHistogram) PositiveBucketIterator() FloatBucketIterator { + return newFloatBucketIterator(h, true) +} + +// NegativeBucketIterator returns a FloatBucketIterator to iterate over all +// negative buckets in descending order (starting next to the zero bucket and +// going down). +func (h *FloatHistogram) NegativeBucketIterator() FloatBucketIterator { + return newFloatBucketIterator(h, false) +} + +// CumulativeBucketIterator returns a FloatBucketIterator to iterate over a +// cumulative view of the buckets. This method currently only supports +// FloatHistograms without negative buckets and panics if the FloatHistogram has +// negative buckets. It is currently only used for testing. +func (h *FloatHistogram) CumulativeBucketIterator() FloatBucketIterator { + if len(h.NegativeBuckets) > 0 { + panic("CumulativeBucketIterator called on FloatHistogram with negative buckets") + } + return &cumulativeFloatBucketIterator{h: h, posSpansIdx: -1} +} + +// FloatBucketIterator iterates over the buckets of a FloatHistogram, returning +// decoded buckets. +type FloatBucketIterator interface { + // Next advances the iterator by one. + Next() bool + // At returns the current bucket. + At() FloatBucket +} + +// FloatBucket represents a bucket with lower and upper limit and the count of +// samples in the bucket. It also specifies if each limit is inclusive or +// not. (Mathematically, inclusive limits create a closed interval, and +// non-inclusive limits an open interval.) +// +// To represent cumulative buckets, Lower is set to -Inf, and the Count is then +// cumulative (including the counts of all buckets for smaller values). +type FloatBucket struct { + Lower, Upper float64 + LowerInclusive, UpperInclusive bool + Count float64 + Index int32 // Index within schema. To easily compare buckets that share the same schema. +} + +// String returns a string representation of a FloatBucket, using the usual +// mathematical notation of '['/']' for inclusive bounds and '('/')' for +// non-inclusive bounds. +func (b FloatBucket) String() string { + var sb strings.Builder + if b.LowerInclusive { + sb.WriteRune('[') + } else { + sb.WriteRune('(') + } + fmt.Fprintf(&sb, "%g,%g", b.Lower, b.Upper) + if b.UpperInclusive { + sb.WriteRune(']') + } else { + sb.WriteRune(')') + } + fmt.Fprintf(&sb, ":%g", b.Count) + return sb.String() +} + +type floatBucketIterator struct { + schema int32 + spans []Span + buckets []float64 + + positive bool // Whether this is for positive buckets. + + spansIdx int // Current span within spans slice. + idxInSpan uint32 // Index in the current span. 0 <= idxInSpan < span.Length. + bucketsIdx int // Current bucket within buckets slice. + + currCount float64 // Count in the current bucket. + currIdx int32 // The actual bucket index. + currLower, currUpper float64 // Limits of the current bucket. + +} + +func newFloatBucketIterator(h *FloatHistogram, positive bool) *floatBucketIterator { + r := &floatBucketIterator{schema: h.Schema, positive: positive} + if positive { + r.spans = h.PositiveSpans + r.buckets = h.PositiveBuckets + } else { + r.spans = h.NegativeSpans + r.buckets = h.NegativeBuckets + } + return r +} + +func (r *floatBucketIterator) Next() bool { + if r.spansIdx >= len(r.spans) { + return false + } + span := r.spans[r.spansIdx] + // Seed currIdx for the first bucket. + if r.bucketsIdx == 0 { + r.currIdx = span.Offset + } else { + r.currIdx++ + } + for r.idxInSpan >= span.Length { + // We have exhausted the current span and have to find a new + // one. We'll even handle pathologic spans of length 0. + r.idxInSpan = 0 + r.spansIdx++ + if r.spansIdx >= len(r.spans) { + return false + } + span = r.spans[r.spansIdx] + r.currIdx += span.Offset + } + + r.currCount = r.buckets[r.bucketsIdx] + if r.positive { + r.currUpper = getBound(r.currIdx, r.schema) + r.currLower = getBound(r.currIdx-1, r.schema) + } else { + r.currLower = -getBound(r.currIdx, r.schema) + r.currUpper = -getBound(r.currIdx-1, r.schema) + } + + r.idxInSpan++ + r.bucketsIdx++ + return true +} + +func (r *floatBucketIterator) At() FloatBucket { + return FloatBucket{ + Count: r.currCount, + Lower: r.currLower, + Upper: r.currUpper, + LowerInclusive: r.currLower < 0, + UpperInclusive: r.currUpper > 0, + Index: r.currIdx, + } +} + +type cumulativeFloatBucketIterator struct { + h *FloatHistogram + + posSpansIdx int // Index in h.PositiveSpans we are in. -1 means 0 bucket. + posBucketsIdx int // Index in h.PositiveBuckets. + idxInSpan uint32 // Index in the current span. 0 <= idxInSpan < span.Length. + + initialized bool + currIdx int32 // The actual bucket index after decoding from spans. + currUpper float64 // The upper boundary of the current bucket. + currCumulativeCount float64 // Current "cumulative" count for the current bucket. + + // Between 2 spans there could be some empty buckets which + // still needs to be counted for cumulative buckets. + // When we hit the end of a span, we use this to iterate + // through the empty buckets. + emptyBucketCount int32 +} + +func (c *cumulativeFloatBucketIterator) Next() bool { + if c.posSpansIdx == -1 { + // Zero bucket. + c.posSpansIdx++ + if c.h.ZeroCount == 0 { + return c.Next() + } + + c.currUpper = c.h.ZeroThreshold + c.currCumulativeCount = c.h.ZeroCount + return true + } + + if c.posSpansIdx >= len(c.h.PositiveSpans) { + return false + } + + if c.emptyBucketCount > 0 { + // We are traversing through empty buckets at the moment. + c.currUpper = getBound(c.currIdx, c.h.Schema) + c.currIdx++ + c.emptyBucketCount-- + return true + } + + span := c.h.PositiveSpans[c.posSpansIdx] + if c.posSpansIdx == 0 && !c.initialized { + // Initializing. + c.currIdx = span.Offset + c.initialized = true + } + + c.currCumulativeCount += c.h.PositiveBuckets[c.posBucketsIdx] + c.currUpper = getBound(c.currIdx, c.h.Schema) + + c.posBucketsIdx++ + c.idxInSpan++ + c.currIdx++ + if c.idxInSpan >= span.Length { + // Move to the next span. This one is done. + c.posSpansIdx++ + c.idxInSpan = 0 + if c.posSpansIdx < len(c.h.PositiveSpans) { + c.emptyBucketCount = c.h.PositiveSpans[c.posSpansIdx].Offset + } + } + + return true +} + +func (c *cumulativeFloatBucketIterator) At() FloatBucket { + return FloatBucket{ + Upper: c.currUpper, + Lower: math.Inf(-1), + UpperInclusive: true, + LowerInclusive: true, + Count: c.currCumulativeCount, + Index: c.currIdx - 1, + } +} diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index 9e983d11eb..96a4d1bbdd 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -67,8 +67,8 @@ type Span struct { } // Copy returns a deep copy of the Histogram. -func (h Histogram) Copy() *Histogram { - c := h +func (h *Histogram) Copy() *Histogram { + c := *h if h.PositiveSpans != nil { c.PositiveSpans = make([]Span, len(h.PositiveSpans)) @@ -91,7 +91,7 @@ func (h Histogram) Copy() *Histogram { } // String returns a string representation of the Histogram. -func (h Histogram) String() string { +func (h *Histogram) String() string { var sb strings.Builder fmt.Fprintf(&sb, "{count:%d, sum:%g", h.Count, h.Sum) @@ -122,7 +122,7 @@ func (h Histogram) String() string { } // ZeroBucket returns the zero bucket. -func (h Histogram) ZeroBucket() Bucket { +func (h *Histogram) ZeroBucket() Bucket { return Bucket{ Lower: -h.ZeroThreshold, Upper: h.ZeroThreshold, @@ -134,25 +134,70 @@ func (h Histogram) ZeroBucket() Bucket { // PositiveBucketIterator returns a BucketIterator to iterate over all positive // buckets in ascending order (starting next to the zero bucket and going up). -func (h Histogram) PositiveBucketIterator() BucketIterator { - return newRegularBucketIterator(&h, true) +func (h *Histogram) PositiveBucketIterator() BucketIterator { + return newRegularBucketIterator(h, true) } // NegativeBucketIterator returns a BucketIterator to iterate over all negative // buckets in descending order (starting next to the zero bucket and going down). -func (h Histogram) NegativeBucketIterator() BucketIterator { - return newRegularBucketIterator(&h, false) +func (h *Histogram) NegativeBucketIterator() BucketIterator { + return newRegularBucketIterator(h, false) } // CumulativeBucketIterator returns a BucketIterator to iterate over a // cumulative view of the buckets. This method currently only supports // Histograms without negative buckets and panics if the Histogram has negative // buckets. It is currently only used for testing. -func (h Histogram) CumulativeBucketIterator() BucketIterator { +func (h *Histogram) CumulativeBucketIterator() BucketIterator { if len(h.NegativeBuckets) > 0 { - panic("CumulativeIterator called on Histogram with negative buckets") + panic("CumulativeBucketIterator called on Histogram with negative buckets") + } + return &cumulativeBucketIterator{h: h, posSpansIdx: -1} +} + +// ToFloat returns a FloatHistogram representation of the Histogram. It is a +// deep copy (e.g. spans are not shared). +func (h *Histogram) ToFloat() *FloatHistogram { + var ( + positiveSpans, negativeSpans []Span + positiveBuckets, negativeBuckets []float64 + ) + if h.PositiveSpans != nil { + positiveSpans = make([]Span, len(h.PositiveSpans)) + copy(positiveSpans, h.PositiveSpans) + } + if h.NegativeSpans != nil { + negativeSpans = make([]Span, len(h.NegativeSpans)) + copy(negativeSpans, h.NegativeSpans) + } + if h.PositiveBuckets != nil { + positiveBuckets = make([]float64, len(h.PositiveBuckets)) + var current float64 + for i, b := range h.PositiveBuckets { + current += float64(b) + positiveBuckets[i] = current + } + } + if h.NegativeBuckets != nil { + negativeBuckets = make([]float64, len(h.NegativeBuckets)) + var current float64 + for i, b := range h.NegativeBuckets { + current += float64(b) + negativeBuckets[i] = current + } + } + + return &FloatHistogram{ + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: float64(h.ZeroCount), + Count: float64(h.Count), + Sum: h.Sum, + PositiveSpans: positiveSpans, + NegativeSpans: negativeSpans, + PositiveBuckets: positiveBuckets, + NegativeBuckets: negativeBuckets, } - return &cumulativeBucketIterator{h: &h, posSpansIdx: -1} } // BucketIterator iterates over the buckets of a Histogram, returning decoded @@ -178,8 +223,9 @@ type Bucket struct { Index int32 // Index within schema. To easily compare buckets that share the same schema. } -// String returns a string representation, using the usual mathematical notation -// of '['/']' for inclusive bounds and '('/')' for non-inclusive bounds. +// String returns a string representation of a Bucket, using the usual +// mathematical notation of '['/']' for inclusive bounds and '('/')' for +// non-inclusive bounds. func (b Bucket) String() string { var sb strings.Builder if b.LowerInclusive { @@ -322,7 +368,7 @@ func (c *cumulativeBucketIterator) Next() bool { span := c.h.PositiveSpans[c.posSpansIdx] if c.posSpansIdx == 0 && !c.initialized { - // Initialising. + // Initializing. c.currIdx = span.Offset // The first bucket is an absolute value and not a delta with Zero bucket. c.currCount = 0 diff --git a/model/histogram/histogram_test.go b/model/histogram/histogram_test.go index 8ef9da69bb..151dacdb8e 100644 --- a/model/histogram/histogram_test.go +++ b/model/histogram/histogram_test.go @@ -385,3 +385,28 @@ func TestRegularBucketIterator(t *testing.T) { }) } } + +func TestHistogramToFloat(t *testing.T) { + h := Histogram{ + Schema: 3, + Count: 61, + Sum: 2.7, + ZeroThreshold: 0.1, + ZeroCount: 42, + PositiveSpans: []Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, + NegativeSpans: []Span{ + {Offset: 0, Length: 5}, + {Offset: 1, Length: 0}, + {Offset: 0, Length: 1}, + }, + NegativeBuckets: []int64{1, 2, -2, 1, -1, 0}, + } + fh := h.ToFloat() + + require.Equal(t, h.String(), fh.String()) +}