Implement kahanAdd and kahanSub methods

Signed-off-by: Aleksandr Smirnov <5targazer@mail.ru>
This commit is contained in:
Aleksandr Smirnov 2024-12-17 19:30:03 +03:00
parent 7fcd68b687
commit c599a5e938

View file

@ -442,6 +442,107 @@ func kahanSumDec(dec, sum, c float64) (newSum, newC float64) {
return t, c
}
// KahanAdd works like Add but using the Kahan summation algorithm to minimize numerical errors.
// It returns pointers to the updated receiving histogram
// and a separate histogram that holds the Kahan compensation term.
func (h *FloatHistogram) KahanAdd(other, c *FloatHistogram) (*FloatHistogram, *FloatHistogram, error) {
if h.UsesCustomBuckets() != other.UsesCustomBuckets() {
return nil, nil, ErrHistogramsIncompatibleSchema
}
if h.UsesCustomBuckets() && !FloatBucketsMatch(h.CustomValues, other.CustomValues) {
return nil, nil, ErrHistogramsIncompatibleBounds
}
switch {
case other.CounterResetHint == h.CounterResetHint:
// Adding apples to apples, all good. No need to change anything.
case h.CounterResetHint == GaugeType:
// Adding something else to a gauge. That's probably OK. Outcome is a gauge.
// Nothing to do since the receiver is already marked as gauge.
case other.CounterResetHint == GaugeType:
// Similar to before, but this time the receiver is "something else" and we have to change it to gauge.
h.CounterResetHint = GaugeType
case h.CounterResetHint == UnknownCounterReset:
// With the receiver's CounterResetHint being "unknown", this could still be legitimate
// if the caller knows what they are doing. Outcome is then again "unknown".
// No need to do anything since the receiver's CounterResetHint is already "unknown".
case other.CounterResetHint == UnknownCounterReset:
// Similar to before, but now we have to set the receiver's CounterResetHint to "unknown".
h.CounterResetHint = UnknownCounterReset
default:
// All other cases shouldn't actually happen.
// They are a direct collision of CounterReset and NotCounterReset.
// Conservatively set the CounterResetHint to "unknown" and issue a warning.
h.CounterResetHint = UnknownCounterReset
// TODO(trevorwhitney): Actually issue the warning as soon as the plumbing for it is in place
}
if !h.UsesCustomBuckets() {
otherZeroCount := h.reconcileZeroBuckets(other)
h.ZeroCount, c.ZeroCount = kahanSumInc(otherZeroCount, h.ZeroCount, c.ZeroCount)
// Ensure c.PositiveSpans and c.NegativeSpans match h.PositiveSpans and h.NegativeSpans.
// Reassign if the underlying arrays have been reallocated; otherwise, resize to match lengths.
if cap(c.PositiveSpans) != cap(h.PositiveSpans) {
c.PositiveSpans = h.PositiveSpans
} else if len(c.PositiveSpans) != len(h.PositiveSpans) {
c.PositiveSpans = c.PositiveSpans[:len(h.PositiveSpans)]
}
if cap(c.NegativeSpans) != cap(h.NegativeSpans) {
c.NegativeSpans = h.NegativeSpans
} else if len(c.NegativeSpans) != len(h.NegativeSpans) {
c.NegativeSpans = c.NegativeSpans[:len(h.NegativeSpans)]
}
}
h.Count, c.Count = kahanSumInc(other.Count, h.Count, c.Count)
h.Sum, c.Sum = kahanSumInc(other.Sum, h.Sum, c.Sum)
var (
hPositiveSpans = h.PositiveSpans
hPositiveBuckets = h.PositiveBuckets
otherPositiveSpans = other.PositiveSpans
otherPositiveBuckets = other.PositiveBuckets
cPositiveBuckets = c.PositiveBuckets
)
if h.UsesCustomBuckets() {
h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = kahanAddBuckets(
h.Schema, h.ZeroThreshold, false, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets, cPositiveBuckets)
return h, c, nil
}
var (
hNegativeSpans = h.NegativeSpans
hNegativeBuckets = h.NegativeBuckets
otherNegativeSpans = other.NegativeSpans
otherNegativeBuckets = other.NegativeBuckets
cNegativeBuckets = c.NegativeBuckets
)
switch {
case other.Schema < h.Schema:
hPositiveSpans, hPositiveBuckets, cPositiveBuckets = kahanReduceResolution(
hPositiveSpans, hPositiveBuckets, cPositiveBuckets, h.Schema, other.Schema, false, true)
hNegativeSpans, hNegativeBuckets, cNegativeBuckets = kahanReduceResolution(
hNegativeSpans, hNegativeBuckets, cNegativeBuckets, h.Schema, other.Schema, false, true)
h.Schema = other.Schema
c.Schema = other.Schema
case other.Schema > h.Schema:
otherPositiveSpans, otherPositiveBuckets = reduceResolution(
otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false)
otherNegativeSpans, otherNegativeBuckets = reduceResolution(
otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false)
}
h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = kahanAddBuckets(
h.Schema, h.ZeroThreshold, false, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets, cPositiveBuckets)
h.NegativeSpans, h.NegativeBuckets, c.NegativeBuckets = kahanAddBuckets(
h.Schema, h.ZeroThreshold, false, hNegativeSpans, hNegativeBuckets, otherNegativeSpans, otherNegativeBuckets, cNegativeBuckets)
return h, c, nil
}
// Sub works like Add but subtracts the other histogram.
func (h *FloatHistogram) Sub(other *FloatHistogram) (*FloatHistogram, error) {
if h.UsesCustomBuckets() != other.UsesCustomBuckets() {
@ -493,6 +594,83 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (*FloatHistogram, error) {
return h, nil
}
// KahanSub works like Sub but using the Kahan summation algorithm to minimize numerical errors.
// It returns pointers to the updated receiving histogram
// and a separate histogram that holds the Kahan compensation term.
func (h *FloatHistogram) KahanSub(other, c *FloatHistogram) (*FloatHistogram, *FloatHistogram, error) {
if h.UsesCustomBuckets() != other.UsesCustomBuckets() {
return nil, nil, ErrHistogramsIncompatibleSchema
}
if h.UsesCustomBuckets() && !FloatBucketsMatch(h.CustomValues, other.CustomValues) {
return nil, nil, ErrHistogramsIncompatibleBounds
}
if !h.UsesCustomBuckets() {
otherZeroCount := h.reconcileZeroBuckets(other)
h.ZeroCount, c.ZeroCount = kahanSumDec(otherZeroCount, h.ZeroCount, c.ZeroCount)
// Ensure c.PositiveSpans and c.NegativeSpans match h.PositiveSpans and h.NegativeSpans.
// Reassign if the underlying arrays have been reallocated; otherwise, resize to match lengths.
if cap(c.PositiveSpans) != cap(h.PositiveSpans) {
c.PositiveSpans = h.PositiveSpans
} else if len(c.PositiveSpans) != len(h.PositiveSpans) {
c.PositiveSpans = c.PositiveSpans[:len(h.PositiveSpans)]
}
if cap(c.NegativeSpans) != cap(h.NegativeSpans) {
c.NegativeSpans = h.NegativeSpans
} else if len(c.NegativeSpans) != len(h.NegativeSpans) {
c.NegativeSpans = c.NegativeSpans[:len(h.NegativeSpans)]
}
}
h.Count, c.Count = kahanSumDec(other.Count, h.Count, c.Count)
h.Sum, c.Sum = kahanSumDec(other.Sum, h.Sum, c.Sum)
var (
hPositiveSpans = h.PositiveSpans
hPositiveBuckets = h.PositiveBuckets
otherPositiveSpans = other.PositiveSpans
otherPositiveBuckets = other.PositiveBuckets
cPositiveBuckets = c.PositiveBuckets
)
if h.UsesCustomBuckets() {
h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = kahanAddBuckets(
h.Schema, h.ZeroThreshold, true, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets, cPositiveBuckets)
return h, c, nil
}
var (
hNegativeSpans = h.NegativeSpans
hNegativeBuckets = h.NegativeBuckets
otherNegativeSpans = other.NegativeSpans
otherNegativeBuckets = other.NegativeBuckets
cNegativeBuckets = c.NegativeBuckets
)
switch {
case other.Schema < h.Schema:
hPositiveSpans, hPositiveBuckets, cPositiveBuckets = kahanReduceResolution(
hPositiveSpans, hPositiveBuckets, cPositiveBuckets, h.Schema, other.Schema, false, true)
hNegativeSpans, hNegativeBuckets, cNegativeBuckets = kahanReduceResolution(
hNegativeSpans, hNegativeBuckets, cNegativeBuckets, h.Schema, other.Schema, false, true)
h.Schema = other.Schema
c.Schema = other.Schema
case other.Schema > h.Schema:
otherPositiveSpans, otherPositiveBuckets = reduceResolution(
otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false)
otherNegativeSpans, otherNegativeBuckets = reduceResolution(
otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false)
}
h.PositiveSpans, h.PositiveBuckets, c.PositiveBuckets = kahanAddBuckets(
h.Schema, h.ZeroThreshold, true, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets, cPositiveBuckets)
h.NegativeSpans, h.NegativeBuckets, c.NegativeBuckets = kahanAddBuckets(
h.Schema, h.ZeroThreshold, true, hNegativeSpans, hNegativeBuckets, otherNegativeSpans, otherNegativeBuckets, cNegativeBuckets)
return h, c, nil
}
// Equals returns true if the given float histogram matches exactly.
// Exact match is when there are no new buckets (even empty) and no missing buckets,
// and all the bucket values match. Spans can have different empty length spans in between,