Add kahanAddBuckets and kahanReduceResolution functions

Signed-off-by: Aleksandr Smirnov <5targazer@mail.ru>
This commit is contained in:
Aleksandr Smirnov 2024-12-17 19:29:22 +03:00
parent bde1637f41
commit 7fcd68b687
2 changed files with 256 additions and 0 deletions

View file

@ -1357,6 +1357,138 @@ func addBuckets(
return spansA, bucketsA
}
// kahanAddBuckets works like addBuckets but it is used in FloatHistogram's KahanAdd/KahanSub methods
// and takes an additional argument, bucketsC, representing buckets of the compensation histogram.
// It returns the resulting spans/buckets and compensation buckets.
func kahanAddBuckets(
schema int32, threshold float64, negative bool,
spansA []Span, bucketsA []float64,
spansB []Span, bucketsB []float64,
bucketsC []float64,
) ([]Span, []float64, []float64) {
var (
iSpan = -1
iBucket = -1
iInSpan int32
indexA int32
indexB int32
bIdxB int
bucketB float64
deltaIndex int32
lowerThanThreshold = true
)
for _, spanB := range spansB {
indexB += spanB.Offset
for j := 0; j < int(spanB.Length); j++ {
if lowerThanThreshold && IsExponentialSchema(schema) && getBoundExponential(indexB, schema) <= threshold {
goto nextLoop
}
lowerThanThreshold = false
bucketB = bucketsB[bIdxB]
if negative {
bucketB *= -1
}
if iSpan == -1 {
if len(spansA) == 0 || spansA[0].Offset > indexB {
// Add bucket before all others.
bucketsA = append(bucketsA, 0)
copy(bucketsA[1:], bucketsA)
bucketsA[0] = bucketB
bucketsC = append(bucketsC, 0)
copy(bucketsC[1:], bucketsC)
bucketsC[0] = 0
if len(spansA) > 0 && spansA[0].Offset == indexB+1 {
spansA[0].Length++
spansA[0].Offset--
goto nextLoop
}
spansA = append(spansA, Span{})
copy(spansA[1:], spansA)
spansA[0] = Span{Offset: indexB, Length: 1}
if len(spansA) > 1 {
// Convert the absolute offset in the formerly
// first span to a relative offset.
spansA[1].Offset -= indexB + 1
}
goto nextLoop
} else if spansA[0].Offset == indexB {
// Just add to first bucket.
bucketsA[0], bucketsC[0] = kahanSumInc(bucketB, bucketsA[0], bucketsC[0])
goto nextLoop
}
iSpan, iBucket, iInSpan = 0, 0, 0
indexA = spansA[0].Offset
}
deltaIndex = indexB - indexA
for {
remainingInSpan := int32(spansA[iSpan].Length) - iInSpan
if deltaIndex < remainingInSpan {
// Bucket is in current span.
iBucket += int(deltaIndex)
iInSpan += deltaIndex
bucketsA[iBucket], bucketsC[iBucket] = kahanSumInc(bucketB, bucketsA[iBucket], bucketsC[iBucket])
break
}
deltaIndex -= remainingInSpan
iBucket += int(remainingInSpan)
iSpan++
if iSpan == len(spansA) || deltaIndex < spansA[iSpan].Offset {
// Bucket is in gap behind previous span (or there are no further spans).
bucketsA = append(bucketsA, 0)
copy(bucketsA[iBucket+1:], bucketsA[iBucket:])
bucketsA[iBucket] = bucketB
bucketsC = append(bucketsC, 0)
copy(bucketsC[iBucket+1:], bucketsC[iBucket:])
bucketsC[iBucket] = 0
switch {
case deltaIndex == 0:
// Directly after previous span, extend previous span.
if iSpan < len(spansA) {
spansA[iSpan].Offset--
}
iSpan--
iInSpan = int32(spansA[iSpan].Length)
spansA[iSpan].Length++
// spansC[iSpan].Length++
goto nextLoop
case iSpan < len(spansA) && deltaIndex == spansA[iSpan].Offset-1:
// Directly before next span, extend next span.
iInSpan = 0
spansA[iSpan].Offset--
spansA[iSpan].Length++
goto nextLoop
default:
// No next span, or next span is not directly adjacent to new bucket.
// Add new span.
iInSpan = 0
if iSpan < len(spansA) {
spansA[iSpan].Offset -= deltaIndex + 1
}
spansA = append(spansA, Span{})
copy(spansA[iSpan+1:], spansA[iSpan:])
spansA[iSpan] = Span{Length: 1, Offset: deltaIndex}
goto nextLoop
}
} else {
// Try start of next span.
deltaIndex -= spansA[iSpan].Offset
iInSpan = 0
}
}
nextLoop:
indexA = indexB
indexB++
bIdxB++
}
}
return spansA, bucketsA, bucketsC
}
func FloatBucketsMatch(b1, b2 []float64) bool {
if len(b1) != len(b2) {
return false

View file

@ -778,6 +778,130 @@ func reduceResolution[IBC InternalBucketCount](
return targetSpans, targetBuckets
}
// kahanReduceResolution works like reduceResolution but it is used in FloatHistogram's KahanAdd method and takes
// an additional argument, originCompBuckets, representing the compensation buckets for the origin histogram.
// This function modifies both the buckets of the origin histogram and its corresponding compensation histogram.
func kahanReduceResolution(
originSpans []Span,
originReceivingBuckets []float64,
originCompBuckets []float64,
originSchema,
targetSchema int32,
deltaBuckets bool,
inplace bool,
) ([]Span, []float64, []float64) {
var (
targetSpans []Span // The spans in the target schema.
targetReceivingBuckets []float64 // The receiving bucket counts in the target schema.
targetCompBuckets []float64 // The compensation bucket counts in the target schema.
bucketIdx int32 // The index of bucket in the origin schema.
bucketCountIdx int // The position of a bucket in origin bucket count slice `originBuckets`.
targetBucketIdx int32 // The index of bucket in the target schema.
lastReceivingBucketCount float64 // The last visited receiving bucket's count in the origin schema.
lastCompBucketCount float64 // The last visited compensation bucket's count in the origin schema.
lastTargetBucketIdx int32 // The index of the last added target bucket.
lastTargetReceivingBucketCount float64
lastTargetCompBucketCount float64
)
if inplace {
// Slice reuse is safe because when reducing the resolution,
// target slices don't grow faster than origin slices are being read.
targetSpans = originSpans[:0]
targetReceivingBuckets = originReceivingBuckets[:0]
targetCompBuckets = originCompBuckets[:0]
}
for _, span := range originSpans {
// Determine the index of the first bucket in this span.
bucketIdx += span.Offset
for j := 0; j < int(span.Length); j++ {
// Determine the index of the bucket in the target schema from the index in the original schema.
targetBucketIdx = targetIdx(bucketIdx, originSchema, targetSchema)
switch {
case len(targetSpans) == 0:
// This is the first span in the targetSpans.
span := Span{
Offset: targetBucketIdx,
Length: 1,
}
targetSpans = append(targetSpans, span)
targetReceivingBuckets = append(targetReceivingBuckets, originReceivingBuckets[bucketCountIdx])
lastTargetBucketIdx = targetBucketIdx
lastReceivingBucketCount = originReceivingBuckets[bucketCountIdx]
lastTargetReceivingBucketCount = originReceivingBuckets[bucketCountIdx]
targetCompBuckets = append(targetCompBuckets, originCompBuckets[bucketCountIdx])
lastCompBucketCount = originCompBuckets[bucketCountIdx]
lastTargetCompBucketCount = originCompBuckets[bucketCountIdx]
case lastTargetBucketIdx == targetBucketIdx:
// The current bucket has to be merged into the same target bucket as the previous bucket.
if deltaBuckets {
lastReceivingBucketCount += originReceivingBuckets[bucketCountIdx]
targetReceivingBuckets[len(targetReceivingBuckets)-1] += lastReceivingBucketCount
lastTargetReceivingBucketCount += lastReceivingBucketCount
lastCompBucketCount += originCompBuckets[bucketCountIdx]
targetCompBuckets[len(targetCompBuckets)-1] += lastCompBucketCount
lastTargetCompBucketCount += lastCompBucketCount
} else {
targetReceivingBuckets[len(targetReceivingBuckets)-1] += originReceivingBuckets[bucketCountIdx]
targetCompBuckets[len(targetCompBuckets)-1] += originCompBuckets[bucketCountIdx]
}
case (lastTargetBucketIdx + 1) == targetBucketIdx:
// The current bucket has to go into a new target bucket,
// and that bucket is next to the previous target bucket,
// so we add it to the current target span.
targetSpans[len(targetSpans)-1].Length++
lastTargetBucketIdx++
if deltaBuckets {
lastReceivingBucketCount += originReceivingBuckets[bucketCountIdx]
targetReceivingBuckets = append(targetReceivingBuckets, lastReceivingBucketCount-lastTargetReceivingBucketCount)
lastTargetReceivingBucketCount = lastReceivingBucketCount
lastCompBucketCount += originCompBuckets[bucketCountIdx]
targetCompBuckets = append(targetCompBuckets, lastCompBucketCount-lastTargetCompBucketCount)
lastTargetCompBucketCount = lastCompBucketCount
} else {
targetReceivingBuckets = append(targetReceivingBuckets, originReceivingBuckets[bucketCountIdx])
targetCompBuckets = append(targetCompBuckets, originCompBuckets[bucketCountIdx])
}
case (lastTargetBucketIdx + 1) < targetBucketIdx:
// The current bucket has to go into a new target bucket,
// and that bucket is separated by a gap from the previous target bucket,
// so we need to add a new target span.
span := Span{
Offset: targetBucketIdx - lastTargetBucketIdx - 1,
Length: 1,
}
targetSpans = append(targetSpans, span)
lastTargetBucketIdx = targetBucketIdx
if deltaBuckets {
lastReceivingBucketCount += originReceivingBuckets[bucketCountIdx]
targetReceivingBuckets = append(targetReceivingBuckets, lastReceivingBucketCount-lastTargetReceivingBucketCount)
lastTargetReceivingBucketCount = lastReceivingBucketCount
lastCompBucketCount += originCompBuckets[bucketCountIdx]
targetCompBuckets = append(targetCompBuckets, lastCompBucketCount-lastTargetCompBucketCount)
lastTargetCompBucketCount = lastCompBucketCount
} else {
targetReceivingBuckets = append(targetReceivingBuckets, originReceivingBuckets[bucketCountIdx])
targetCompBuckets = append(targetCompBuckets, originCompBuckets[bucketCountIdx])
}
}
bucketIdx++
bucketCountIdx++
}
}
return targetSpans, targetReceivingBuckets, targetCompBuckets
}
func clearIfNotNil[T any](items []T) []T {
if items == nil {
return nil