diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 4d51ce7f41..d22937cc23 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -1357,6 +1357,138 @@ func addBuckets( return spansA, bucketsA } +// kahanAddBuckets works like addBuckets but it is used in FloatHistogram's KahanAdd/KahanSub methods +// and takes an additional argument, bucketsC, representing buckets of the compensation histogram. +// It returns the resulting spans/buckets and compensation buckets. +func kahanAddBuckets( + schema int32, threshold float64, negative bool, + spansA []Span, bucketsA []float64, + spansB []Span, bucketsB []float64, + bucketsC []float64, +) ([]Span, []float64, []float64) { + var ( + iSpan = -1 + iBucket = -1 + iInSpan int32 + indexA int32 + indexB int32 + bIdxB int + bucketB float64 + deltaIndex int32 + lowerThanThreshold = true + ) + + for _, spanB := range spansB { + indexB += spanB.Offset + for j := 0; j < int(spanB.Length); j++ { + if lowerThanThreshold && IsExponentialSchema(schema) && getBoundExponential(indexB, schema) <= threshold { + goto nextLoop + } + lowerThanThreshold = false + + bucketB = bucketsB[bIdxB] + if negative { + bucketB *= -1 + } + + if iSpan == -1 { + if len(spansA) == 0 || spansA[0].Offset > indexB { + // Add bucket before all others. + bucketsA = append(bucketsA, 0) + copy(bucketsA[1:], bucketsA) + bucketsA[0] = bucketB + bucketsC = append(bucketsC, 0) + copy(bucketsC[1:], bucketsC) + bucketsC[0] = 0 + if len(spansA) > 0 && spansA[0].Offset == indexB+1 { + spansA[0].Length++ + spansA[0].Offset-- + goto nextLoop + } + spansA = append(spansA, Span{}) + copy(spansA[1:], spansA) + spansA[0] = Span{Offset: indexB, Length: 1} + if len(spansA) > 1 { + // Convert the absolute offset in the formerly + // first span to a relative offset. + spansA[1].Offset -= indexB + 1 + } + goto nextLoop + } else if spansA[0].Offset == indexB { + // Just add to first bucket. + bucketsA[0], bucketsC[0] = kahanSumInc(bucketB, bucketsA[0], bucketsC[0]) + goto nextLoop + } + iSpan, iBucket, iInSpan = 0, 0, 0 + indexA = spansA[0].Offset + } + deltaIndex = indexB - indexA + for { + remainingInSpan := int32(spansA[iSpan].Length) - iInSpan + if deltaIndex < remainingInSpan { + // Bucket is in current span. + iBucket += int(deltaIndex) + iInSpan += deltaIndex + bucketsA[iBucket], bucketsC[iBucket] = kahanSumInc(bucketB, bucketsA[iBucket], bucketsC[iBucket]) + break + } + deltaIndex -= remainingInSpan + iBucket += int(remainingInSpan) + iSpan++ + if iSpan == len(spansA) || deltaIndex < spansA[iSpan].Offset { + // Bucket is in gap behind previous span (or there are no further spans). + bucketsA = append(bucketsA, 0) + copy(bucketsA[iBucket+1:], bucketsA[iBucket:]) + bucketsA[iBucket] = bucketB + bucketsC = append(bucketsC, 0) + copy(bucketsC[iBucket+1:], bucketsC[iBucket:]) + bucketsC[iBucket] = 0 + switch { + case deltaIndex == 0: + // Directly after previous span, extend previous span. + if iSpan < len(spansA) { + spansA[iSpan].Offset-- + } + iSpan-- + iInSpan = int32(spansA[iSpan].Length) + spansA[iSpan].Length++ + // spansC[iSpan].Length++ + goto nextLoop + case iSpan < len(spansA) && deltaIndex == spansA[iSpan].Offset-1: + // Directly before next span, extend next span. + iInSpan = 0 + spansA[iSpan].Offset-- + spansA[iSpan].Length++ + goto nextLoop + default: + // No next span, or next span is not directly adjacent to new bucket. + // Add new span. + iInSpan = 0 + if iSpan < len(spansA) { + spansA[iSpan].Offset -= deltaIndex + 1 + } + spansA = append(spansA, Span{}) + copy(spansA[iSpan+1:], spansA[iSpan:]) + spansA[iSpan] = Span{Length: 1, Offset: deltaIndex} + goto nextLoop + } + } else { + // Try start of next span. + deltaIndex -= spansA[iSpan].Offset + iInSpan = 0 + } + } + + nextLoop: + indexA = indexB + indexB++ + bIdxB++ + } + } + + return spansA, bucketsA, bucketsC +} + func FloatBucketsMatch(b1, b2 []float64) bool { if len(b1) != len(b2) { return false diff --git a/model/histogram/generic.go b/model/histogram/generic.go index a36b58d069..f6cb834faa 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -778,6 +778,130 @@ func reduceResolution[IBC InternalBucketCount]( return targetSpans, targetBuckets } +// kahanReduceResolution works like reduceResolution but it is used in FloatHistogram's KahanAdd method and takes +// an additional argument, originCompBuckets, representing the compensation buckets for the origin histogram. +// This function modifies both the buckets of the origin histogram and its corresponding compensation histogram. +func kahanReduceResolution( + originSpans []Span, + originReceivingBuckets []float64, + originCompBuckets []float64, + originSchema, + targetSchema int32, + deltaBuckets bool, + inplace bool, +) ([]Span, []float64, []float64) { + var ( + targetSpans []Span // The spans in the target schema. + targetReceivingBuckets []float64 // The receiving bucket counts in the target schema. + targetCompBuckets []float64 // The compensation bucket counts in the target schema. + bucketIdx int32 // The index of bucket in the origin schema. + bucketCountIdx int // The position of a bucket in origin bucket count slice `originBuckets`. + targetBucketIdx int32 // The index of bucket in the target schema. + lastReceivingBucketCount float64 // The last visited receiving bucket's count in the origin schema. + lastCompBucketCount float64 // The last visited compensation bucket's count in the origin schema. + lastTargetBucketIdx int32 // The index of the last added target bucket. + lastTargetReceivingBucketCount float64 + lastTargetCompBucketCount float64 + ) + + if inplace { + // Slice reuse is safe because when reducing the resolution, + // target slices don't grow faster than origin slices are being read. + targetSpans = originSpans[:0] + targetReceivingBuckets = originReceivingBuckets[:0] + targetCompBuckets = originCompBuckets[:0] + } + + for _, span := range originSpans { + // Determine the index of the first bucket in this span. + bucketIdx += span.Offset + for j := 0; j < int(span.Length); j++ { + // Determine the index of the bucket in the target schema from the index in the original schema. + targetBucketIdx = targetIdx(bucketIdx, originSchema, targetSchema) + + switch { + case len(targetSpans) == 0: + // This is the first span in the targetSpans. + span := Span{ + Offset: targetBucketIdx, + Length: 1, + } + targetSpans = append(targetSpans, span) + targetReceivingBuckets = append(targetReceivingBuckets, originReceivingBuckets[bucketCountIdx]) + lastTargetBucketIdx = targetBucketIdx + lastReceivingBucketCount = originReceivingBuckets[bucketCountIdx] + lastTargetReceivingBucketCount = originReceivingBuckets[bucketCountIdx] + + targetCompBuckets = append(targetCompBuckets, originCompBuckets[bucketCountIdx]) + lastCompBucketCount = originCompBuckets[bucketCountIdx] + lastTargetCompBucketCount = originCompBuckets[bucketCountIdx] + + case lastTargetBucketIdx == targetBucketIdx: + // The current bucket has to be merged into the same target bucket as the previous bucket. + if deltaBuckets { + lastReceivingBucketCount += originReceivingBuckets[bucketCountIdx] + targetReceivingBuckets[len(targetReceivingBuckets)-1] += lastReceivingBucketCount + lastTargetReceivingBucketCount += lastReceivingBucketCount + + lastCompBucketCount += originCompBuckets[bucketCountIdx] + targetCompBuckets[len(targetCompBuckets)-1] += lastCompBucketCount + lastTargetCompBucketCount += lastCompBucketCount + } else { + targetReceivingBuckets[len(targetReceivingBuckets)-1] += originReceivingBuckets[bucketCountIdx] + targetCompBuckets[len(targetCompBuckets)-1] += originCompBuckets[bucketCountIdx] + } + + case (lastTargetBucketIdx + 1) == targetBucketIdx: + // The current bucket has to go into a new target bucket, + // and that bucket is next to the previous target bucket, + // so we add it to the current target span. + targetSpans[len(targetSpans)-1].Length++ + lastTargetBucketIdx++ + if deltaBuckets { + lastReceivingBucketCount += originReceivingBuckets[bucketCountIdx] + targetReceivingBuckets = append(targetReceivingBuckets, lastReceivingBucketCount-lastTargetReceivingBucketCount) + lastTargetReceivingBucketCount = lastReceivingBucketCount + + lastCompBucketCount += originCompBuckets[bucketCountIdx] + targetCompBuckets = append(targetCompBuckets, lastCompBucketCount-lastTargetCompBucketCount) + lastTargetCompBucketCount = lastCompBucketCount + } else { + targetReceivingBuckets = append(targetReceivingBuckets, originReceivingBuckets[bucketCountIdx]) + targetCompBuckets = append(targetCompBuckets, originCompBuckets[bucketCountIdx]) + } + + case (lastTargetBucketIdx + 1) < targetBucketIdx: + // The current bucket has to go into a new target bucket, + // and that bucket is separated by a gap from the previous target bucket, + // so we need to add a new target span. + span := Span{ + Offset: targetBucketIdx - lastTargetBucketIdx - 1, + Length: 1, + } + targetSpans = append(targetSpans, span) + lastTargetBucketIdx = targetBucketIdx + if deltaBuckets { + lastReceivingBucketCount += originReceivingBuckets[bucketCountIdx] + targetReceivingBuckets = append(targetReceivingBuckets, lastReceivingBucketCount-lastTargetReceivingBucketCount) + lastTargetReceivingBucketCount = lastReceivingBucketCount + + lastCompBucketCount += originCompBuckets[bucketCountIdx] + targetCompBuckets = append(targetCompBuckets, lastCompBucketCount-lastTargetCompBucketCount) + lastTargetCompBucketCount = lastCompBucketCount + } else { + targetReceivingBuckets = append(targetReceivingBuckets, originReceivingBuckets[bucketCountIdx]) + targetCompBuckets = append(targetCompBuckets, originCompBuckets[bucketCountIdx]) + } + } + + bucketIdx++ + bucketCountIdx++ + } + } + + return targetSpans, targetReceivingBuckets, targetCompBuckets +} + func clearIfNotNil[T any](items []T) []T { if items == nil { return nil