From c599a5e938d5b0fded1df487c4324cbef63f06ff Mon Sep 17 00:00:00 2001 From: Aleksandr Smirnov <5targazer@mail.ru> Date: Tue, 17 Dec 2024 19:30:03 +0300 Subject: [PATCH] Implement kahanAdd and kahanSub methods Signed-off-by: Aleksandr Smirnov <5targazer@mail.ru> --- model/histogram/float_histogram.go | 178 +++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index d22937cc23..30e710ef16 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -442,6 +442,107 @@ func kahanSumDec(dec, sum, c float64) (newSum, newC float64) { return t, c } +// KahanAdd works like Add but using the Kahan summation algorithm to minimize numerical errors. +// It returns pointers to the updated receiving histogram +// and a separate histogram that holds the Kahan compensation term. +func (h *FloatHistogram) KahanAdd(other, c *FloatHistogram) (*FloatHistogram, *FloatHistogram, error) { + if h.UsesCustomBuckets() != other.UsesCustomBuckets() { + return nil, nil, ErrHistogramsIncompatibleSchema + } + if h.UsesCustomBuckets() && !FloatBucketsMatch(h.CustomValues, other.CustomValues) { + return nil, nil, ErrHistogramsIncompatibleBounds + } + + switch { + case other.CounterResetHint == h.CounterResetHint: + // Adding apples to apples, all good. No need to change anything. + case h.CounterResetHint == GaugeType: + // Adding something else to a gauge. That's probably OK. Outcome is a gauge. + // Nothing to do since the receiver is already marked as gauge. + case other.CounterResetHint == GaugeType: + // Similar to before, but this time the receiver is "something else" and we have to change it to gauge. + h.CounterResetHint = GaugeType + case h.CounterResetHint == UnknownCounterReset: + // With the receiver's CounterResetHint being "unknown", this could still be legitimate + // if the caller knows what they are doing. Outcome is then again "unknown". + // No need to do anything since the receiver's CounterResetHint is already "unknown". + case other.CounterResetHint == UnknownCounterReset: + // Similar to before, but now we have to set the receiver's CounterResetHint to "unknown". + h.CounterResetHint = UnknownCounterReset + default: + // All other cases shouldn't actually happen. + // They are a direct collision of CounterReset and NotCounterReset. + // Conservatively set the CounterResetHint to "unknown" and issue a warning. + h.CounterResetHint = UnknownCounterReset + // TODO(trevorwhitney): Actually issue the warning as soon as the plumbing for it is in place + } + + if !h.UsesCustomBuckets() { + otherZeroCount := h.reconcileZeroBuckets(other) + h.ZeroCount, c.ZeroCount = kahanSumInc(otherZeroCount, h.ZeroCount, c.ZeroCount) + + // Ensure c.PositiveSpans and c.NegativeSpans match h.PositiveSpans and h.NegativeSpans. + // Reassign if the underlying arrays have been reallocated; otherwise, resize to match lengths. + if cap(c.PositiveSpans) != cap(h.PositiveSpans) { + c.PositiveSpans = h.PositiveSpans + } else if len(c.PositiveSpans) != len(h.PositiveSpans) { + c.PositiveSpans = c.PositiveSpans[:len(h.PositiveSpans)] + } + if cap(c.NegativeSpans) != cap(h.NegativeSpans) { + c.NegativeSpans = h.NegativeSpans + } else if len(c.NegativeSpans) != len(h.NegativeSpans) { + c.NegativeSpans = c.NegativeSpans[:len(h.NegativeSpans)] + } + } + h.Count, c.Count = kahanSumInc(other.Count, h.Count, c.Count) + h.Sum, c.Sum = kahanSumInc(other.Sum, h.Sum, c.Sum) + + var ( + hPositiveSpans = h.PositiveSpans + hPositiveBuckets = h.PositiveBuckets + otherPositiveSpans = other.PositiveSpans + otherPositiveBuckets = other.PositiveBuckets + cPositiveBuckets = c.PositiveBuckets + ) + + if h.UsesCustomBuckets() { + h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = kahanAddBuckets( + h.Schema, h.ZeroThreshold, false, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets, cPositiveBuckets) + return h, c, nil + } + + var ( + hNegativeSpans = h.NegativeSpans + hNegativeBuckets = h.NegativeBuckets + otherNegativeSpans = other.NegativeSpans + otherNegativeBuckets = other.NegativeBuckets + cNegativeBuckets = c.NegativeBuckets + ) + + switch { + case other.Schema < h.Schema: + hPositiveSpans, hPositiveBuckets, cPositiveBuckets = kahanReduceResolution( + hPositiveSpans, hPositiveBuckets, cPositiveBuckets, h.Schema, other.Schema, false, true) + hNegativeSpans, hNegativeBuckets, cNegativeBuckets = kahanReduceResolution( + hNegativeSpans, hNegativeBuckets, cNegativeBuckets, h.Schema, other.Schema, false, true) + h.Schema = other.Schema + c.Schema = other.Schema + + case other.Schema > h.Schema: + otherPositiveSpans, otherPositiveBuckets = reduceResolution( + otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false) + otherNegativeSpans, otherNegativeBuckets = reduceResolution( + otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false) + } + + h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = kahanAddBuckets( + h.Schema, h.ZeroThreshold, false, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets, cPositiveBuckets) + h.NegativeSpans, h.NegativeBuckets, c.NegativeBuckets = kahanAddBuckets( + h.Schema, h.ZeroThreshold, false, hNegativeSpans, hNegativeBuckets, otherNegativeSpans, otherNegativeBuckets, cNegativeBuckets) + + return h, c, nil +} + // Sub works like Add but subtracts the other histogram. func (h *FloatHistogram) Sub(other *FloatHistogram) (*FloatHistogram, error) { if h.UsesCustomBuckets() != other.UsesCustomBuckets() { @@ -493,6 +594,83 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (*FloatHistogram, error) { return h, nil } +// KahanSub works like Sub but using the Kahan summation algorithm to minimize numerical errors. +// It returns pointers to the updated receiving histogram +// and a separate histogram that holds the Kahan compensation term. +func (h *FloatHistogram) KahanSub(other, c *FloatHistogram) (*FloatHistogram, *FloatHistogram, error) { + if h.UsesCustomBuckets() != other.UsesCustomBuckets() { + return nil, nil, ErrHistogramsIncompatibleSchema + } + if h.UsesCustomBuckets() && !FloatBucketsMatch(h.CustomValues, other.CustomValues) { + return nil, nil, ErrHistogramsIncompatibleBounds + } + + if !h.UsesCustomBuckets() { + otherZeroCount := h.reconcileZeroBuckets(other) + h.ZeroCount, c.ZeroCount = kahanSumDec(otherZeroCount, h.ZeroCount, c.ZeroCount) + + // Ensure c.PositiveSpans and c.NegativeSpans match h.PositiveSpans and h.NegativeSpans. + // Reassign if the underlying arrays have been reallocated; otherwise, resize to match lengths. + if cap(c.PositiveSpans) != cap(h.PositiveSpans) { + c.PositiveSpans = h.PositiveSpans + } else if len(c.PositiveSpans) != len(h.PositiveSpans) { + c.PositiveSpans = c.PositiveSpans[:len(h.PositiveSpans)] + } + if cap(c.NegativeSpans) != cap(h.NegativeSpans) { + c.NegativeSpans = h.NegativeSpans + } else if len(c.NegativeSpans) != len(h.NegativeSpans) { + c.NegativeSpans = c.NegativeSpans[:len(h.NegativeSpans)] + } + } + h.Count, c.Count = kahanSumDec(other.Count, h.Count, c.Count) + h.Sum, c.Sum = kahanSumDec(other.Sum, h.Sum, c.Sum) + + var ( + hPositiveSpans = h.PositiveSpans + hPositiveBuckets = h.PositiveBuckets + otherPositiveSpans = other.PositiveSpans + otherPositiveBuckets = other.PositiveBuckets + cPositiveBuckets = c.PositiveBuckets + ) + + if h.UsesCustomBuckets() { + h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = kahanAddBuckets( + h.Schema, h.ZeroThreshold, true, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets, cPositiveBuckets) + return h, c, nil + } + + var ( + hNegativeSpans = h.NegativeSpans + hNegativeBuckets = h.NegativeBuckets + otherNegativeSpans = other.NegativeSpans + otherNegativeBuckets = other.NegativeBuckets + cNegativeBuckets = c.NegativeBuckets + ) + + switch { + case other.Schema < h.Schema: + hPositiveSpans, hPositiveBuckets, cPositiveBuckets = kahanReduceResolution( + hPositiveSpans, hPositiveBuckets, cPositiveBuckets, h.Schema, other.Schema, false, true) + hNegativeSpans, hNegativeBuckets, cNegativeBuckets = kahanReduceResolution( + hNegativeSpans, hNegativeBuckets, cNegativeBuckets, h.Schema, other.Schema, false, true) + h.Schema = other.Schema + c.Schema = other.Schema + + case other.Schema > h.Schema: + otherPositiveSpans, otherPositiveBuckets = reduceResolution( + otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false) + otherNegativeSpans, otherNegativeBuckets = reduceResolution( + otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false) + } + + h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = kahanAddBuckets( + h.Schema, h.ZeroThreshold, true, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets, cPositiveBuckets) + h.NegativeSpans, h.NegativeBuckets, c.NegativeBuckets = kahanAddBuckets( + h.Schema, h.ZeroThreshold, true, hNegativeSpans, hNegativeBuckets, otherNegativeSpans, otherNegativeBuckets, cNegativeBuckets) + + return h, c, nil +} + // Equals returns true if the given float histogram matches exactly. // Exact match is when there are no new buckets (even empty) and no missing buckets, // and all the bucket values match. Spans can have different empty length spans in between,