From ab2a7bb74fa32f9199e3b9d4a19c000c304a37e9 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Wed, 8 Nov 2023 21:43:05 +0800 Subject: [PATCH] add generic shrink function (#13001) Add `ReduceResolution` method to `Histogram` and `FloatHistogram` This takes the original `mergeToSchema` function and turns it into a more generic `reduceResolution` function, which is the building block for the new methods. The methods will help with addressing #12864. --------- Signed-off-by: Ziqi Zhao --- model/histogram/float_histogram.go | 98 ++++++------------------------ model/histogram/generic.go | 87 ++++++++++++++++++++++++++ model/histogram/generic_test.go | 70 +++++++++++++++++++++ model/histogram/histogram.go | 12 ++++ 4 files changed, 186 insertions(+), 81 deletions(-) diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 22d33f5a4e..212b028800 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -94,8 +94,8 @@ func (h *FloatHistogram) CopyToSchema(targetSchema int32) *FloatHistogram { Sum: h.Sum, } - c.PositiveSpans, c.PositiveBuckets = mergeToSchema(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema) - c.NegativeSpans, c.NegativeBuckets = mergeToSchema(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema) + c.PositiveSpans, c.PositiveBuckets = reduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false) + c.NegativeSpans, c.NegativeBuckets = reduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false) return &c } @@ -268,17 +268,12 @@ func (h *FloatHistogram) Add(other *FloatHistogram) *FloatHistogram { h.Count += other.Count h.Sum += other.Sum - otherPositiveSpans := other.PositiveSpans - otherPositiveBuckets := other.PositiveBuckets - otherNegativeSpans := other.NegativeSpans - otherNegativeBuckets := other.NegativeBuckets if other.Schema != h.Schema { - otherPositiveSpans, otherPositiveBuckets = mergeToSchema(other.PositiveSpans, other.PositiveBuckets, other.Schema, h.Schema) - otherNegativeSpans, otherNegativeBuckets = mergeToSchema(other.NegativeSpans, other.NegativeBuckets, other.Schema, h.Schema) + other = other.ReduceResolution(h.Schema) } - h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, h.PositiveSpans, h.PositiveBuckets, otherPositiveSpans, otherPositiveBuckets) - h.NegativeSpans, h.NegativeBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, h.NegativeSpans, h.NegativeBuckets, otherNegativeSpans, otherNegativeBuckets) + h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, h.PositiveSpans, h.PositiveBuckets, other.PositiveSpans, other.PositiveBuckets) + h.NegativeSpans, h.NegativeBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, h.NegativeSpans, h.NegativeBuckets, other.NegativeSpans, other.NegativeBuckets) return h } @@ -289,17 +284,12 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram { h.Count -= other.Count h.Sum -= other.Sum - otherPositiveSpans := other.PositiveSpans - otherPositiveBuckets := other.PositiveBuckets - otherNegativeSpans := other.NegativeSpans - otherNegativeBuckets := other.NegativeBuckets if other.Schema != h.Schema { - otherPositiveSpans, otherPositiveBuckets = mergeToSchema(other.PositiveSpans, other.PositiveBuckets, other.Schema, h.Schema) - otherNegativeSpans, otherNegativeBuckets = mergeToSchema(other.NegativeSpans, other.NegativeBuckets, other.Schema, h.Schema) + other = other.ReduceResolution(h.Schema) } - h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, h.PositiveSpans, h.PositiveBuckets, otherPositiveSpans, otherPositiveBuckets) - h.NegativeSpans, h.NegativeBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, h.NegativeSpans, h.NegativeBuckets, otherNegativeSpans, otherNegativeBuckets) + h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, h.PositiveSpans, h.PositiveBuckets, other.PositiveSpans, other.PositiveBuckets) + h.NegativeSpans, h.NegativeBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, h.NegativeSpans, h.NegativeBuckets, other.NegativeSpans, other.NegativeBuckets) return h } @@ -975,69 +965,6 @@ func targetIdx(idx, originSchema, targetSchema int32) int32 { return ((idx - 1) >> (originSchema - targetSchema)) + 1 } -// mergeToSchema is used to merge a FloatHistogram's Spans and Buckets (no matter if -// positive or negative) from the original schema to the target schema. -// The target schema must be smaller than the original schema. -func mergeToSchema(originSpans []Span, originBuckets []float64, originSchema, targetSchema int32) ([]Span, []float64) { - var ( - targetSpans []Span // The spans in the target schema. - targetBuckets []float64 // The buckets in the target schema. - bucketIdx int32 // The index of bucket in the origin schema. - lastTargetBucketIdx int32 // The index of the last added target bucket. - origBucketIdx int // The position of a bucket in originBuckets slice. - ) - - for _, span := range originSpans { - // Determine the index of the first bucket in this span. - bucketIdx += span.Offset - for j := 0; j < int(span.Length); j++ { - // Determine the index of the bucket in the target schema from the index in the original schema. - targetBucketIdx := targetIdx(bucketIdx, originSchema, targetSchema) - - switch { - case len(targetSpans) == 0: - // This is the first span in the targetSpans. - span := Span{ - Offset: targetBucketIdx, - Length: 1, - } - targetSpans = append(targetSpans, span) - targetBuckets = append(targetBuckets, originBuckets[0]) - lastTargetBucketIdx = targetBucketIdx - - case lastTargetBucketIdx == targetBucketIdx: - // The current bucket has to be merged into the same target bucket as the previous bucket. - targetBuckets[len(targetBuckets)-1] += originBuckets[origBucketIdx] - - case (lastTargetBucketIdx + 1) == targetBucketIdx: - // The current bucket has to go into a new target bucket, - // and that bucket is next to the previous target bucket, - // so we add it to the current target span. - targetSpans[len(targetSpans)-1].Length++ - targetBuckets = append(targetBuckets, originBuckets[origBucketIdx]) - lastTargetBucketIdx++ - - case (lastTargetBucketIdx + 1) < targetBucketIdx: - // The current bucket has to go into a new target bucket, - // and that bucket is separated by a gap from the previous target bucket, - // so we need to add a new target span. - span := Span{ - Offset: targetBucketIdx - lastTargetBucketIdx - 1, - Length: 1, - } - targetSpans = append(targetSpans, span) - targetBuckets = append(targetBuckets, originBuckets[origBucketIdx]) - lastTargetBucketIdx = targetBucketIdx - } - - bucketIdx++ - origBucketIdx++ - } - } - - return targetSpans, targetBuckets -} - // addBuckets adds the buckets described by spansB/bucketsB to the buckets described by spansA/bucketsA, // creating missing buckets in spansA/bucketsA as needed. // It returns the resulting spans/buckets (which must be used instead of the original spansA/bucketsA, @@ -1179,3 +1106,12 @@ func floatBucketsMatch(b1, b2 []float64) bool { } return true } + +// ReduceResolution reduces the float histogram's spans, buckets into target schema. +// The target schema must be smaller than the current float histogram's schema. +func (h *FloatHistogram) ReduceResolution(targetSchema int32) *FloatHistogram { + h.PositiveSpans, h.PositiveBuckets = reduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false) + h.NegativeSpans, h.NegativeBuckets = reduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false) + + return h +} diff --git a/model/histogram/generic.go b/model/histogram/generic.go index 7e4eb1ecb1..d42bb24151 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -600,3 +600,90 @@ var exponentialBounds = [][]float64{ 0.9892280131939752, 0.9919100824251095, 0.9945994234836328, 0.9972960560854698, }, } + +// reduceResolution reduces the input spans, buckets in origin schema to the spans, buckets in target schema. +// The target schema must be smaller than the original schema. +// Set deltaBuckets to true if the provided buckets are +// deltas. Set it to false if the buckets contain absolute counts. +func reduceResolution[IBC InternalBucketCount](originSpans []Span, originBuckets []IBC, originSchema, targetSchema int32, deltaBuckets bool) ([]Span, []IBC) { + var ( + targetSpans []Span // The spans in the target schema. + targetBuckets []IBC // The bucket counts in the target schema. + bucketIdx int32 // The index of bucket in the origin schema. + bucketCountIdx int // The position of a bucket in origin bucket count slice `originBuckets`. + targetBucketIdx int32 // The index of bucket in the target schema. + lastBucketCount IBC // The last visited bucket's count in the origin schema. + lastTargetBucketIdx int32 // The index of the last added target bucket. + lastTargetBucketCount IBC + ) + + for _, span := range originSpans { + // Determine the index of the first bucket in this span. + bucketIdx += span.Offset + for j := 0; j < int(span.Length); j++ { + // Determine the index of the bucket in the target schema from the index in the original schema. + targetBucketIdx = targetIdx(bucketIdx, originSchema, targetSchema) + + switch { + case len(targetSpans) == 0: + // This is the first span in the targetSpans. + span := Span{ + Offset: targetBucketIdx, + Length: 1, + } + targetSpans = append(targetSpans, span) + targetBuckets = append(targetBuckets, originBuckets[bucketCountIdx]) + lastTargetBucketIdx = targetBucketIdx + lastBucketCount = originBuckets[bucketCountIdx] + lastTargetBucketCount = originBuckets[bucketCountIdx] + + case lastTargetBucketIdx == targetBucketIdx: + // The current bucket has to be merged into the same target bucket as the previous bucket. + if deltaBuckets { + lastBucketCount += originBuckets[bucketCountIdx] + targetBuckets[len(targetBuckets)-1] += lastBucketCount + lastTargetBucketCount += lastBucketCount + } else { + targetBuckets[len(targetBuckets)-1] += originBuckets[bucketCountIdx] + } + + case (lastTargetBucketIdx + 1) == targetBucketIdx: + // The current bucket has to go into a new target bucket, + // and that bucket is next to the previous target bucket, + // so we add it to the current target span. + targetSpans[len(targetSpans)-1].Length++ + lastTargetBucketIdx++ + if deltaBuckets { + lastBucketCount += originBuckets[bucketCountIdx] + targetBuckets = append(targetBuckets, lastBucketCount-lastTargetBucketCount) + lastTargetBucketCount = lastBucketCount + } else { + targetBuckets = append(targetBuckets, originBuckets[bucketCountIdx]) + } + + case (lastTargetBucketIdx + 1) < targetBucketIdx: + // The current bucket has to go into a new target bucket, + // and that bucket is separated by a gap from the previous target bucket, + // so we need to add a new target span. + span := Span{ + Offset: targetBucketIdx - lastTargetBucketIdx - 1, + Length: 1, + } + targetSpans = append(targetSpans, span) + lastTargetBucketIdx = targetBucketIdx + if deltaBuckets { + lastBucketCount += originBuckets[bucketCountIdx] + targetBuckets = append(targetBuckets, lastBucketCount-lastTargetBucketCount) + lastTargetBucketCount = lastBucketCount + } else { + targetBuckets = append(targetBuckets, originBuckets[bucketCountIdx]) + } + } + + bucketIdx++ + bucketCountIdx++ + } + } + + return targetSpans, targetBuckets +} diff --git a/model/histogram/generic_test.go b/model/histogram/generic_test.go index 55015c047f..d24910d214 100644 --- a/model/histogram/generic_test.go +++ b/model/histogram/generic_test.go @@ -110,3 +110,73 @@ func TestGetBound(t *testing.T) { } } } + +func TestReduceResolutionHistogram(t *testing.T) { + cases := []struct { + spans []Span + buckets []int64 + schema int32 + targetSchema int32 + expectedSpans []Span + expectedBuckets []int64 + }{ + { + spans: []Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 3, Length: 2}, + }, + buckets: []int64{1, 2, -2, 1, -1, 0}, + schema: 0, + targetSchema: -1, + expectedSpans: []Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + }, + expectedBuckets: []int64{1, 3, -2, 0}, + // schema 0, base 2 { (0.5, 1]:1 (1,2]:3, (2,4]:1, (4,8]:2, (8,16]:0, (16,32]:0, (32,64]:0, (64,128]:1, (128,256]:1}", + // schema 1, base 4 { (0.25, 1):1 (1,4]:4, (4,16]:2, (16,64]:0, (64,256]:2} + }, + } + + for _, tc := range cases { + spans, buckets := reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true) + require.Equal(t, tc.expectedSpans, spans) + require.Equal(t, tc.expectedBuckets, buckets) + } +} + +func TestReduceResolutionFloatHistogram(t *testing.T) { + cases := []struct { + spans []Span + buckets []float64 + schema int32 + targetSchema int32 + expectedSpans []Span + expectedBuckets []float64 + }{ + { + spans: []Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 3, Length: 2}, + }, + buckets: []float64{1, 3, 1, 2, 1, 1}, + schema: 0, + targetSchema: -1, + expectedSpans: []Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + }, + expectedBuckets: []float64{1, 4, 2, 2}, + // schema 0, base 2 { (0.5, 1]:1 (1,2]:3, (2,4]:1, (4,8]:2, (8,16]:0, (16,32]:0, (32,64]:0, (64,128]:1, (128,256]:1}", + // schema 1, base 4 { (0.25, 1):1 (1,4]:4, (4,16]:2, (16,64]:0, (64,256]:2} + }, + } + + for _, tc := range cases { + spans, buckets := reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false) + require.Equal(t, tc.expectedSpans, spans) + require.Equal(t, tc.expectedBuckets, buckets) + } +} diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index 30c23e5e79..4699bd3cbe 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -493,3 +493,15 @@ func (c *cumulativeBucketIterator) At() Bucket[uint64] { Index: c.currIdx - 1, } } + +// ReduceResolution reduces the histogram's spans, buckets into target schema. +// The target schema must be smaller than the current histogram's schema. +func (h *Histogram) ReduceResolution(targetSchema int32) *Histogram { + h.PositiveSpans, h.PositiveBuckets = reduceResolution( + h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, true, + ) + h.NegativeSpans, h.NegativeBuckets = reduceResolution( + h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, true, + ) + return h +}