diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index bc395f7a0b..6ba44cbd90 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -89,8 +89,8 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom scale = 8 } - pSpans, pDeltas := convertBucketsLayout(p.Positive(), scaleDown) - nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown) + pSpans, pDeltas := convertBucketsLayout(p.Positive().BucketCounts().AsRaw(), p.Positive().Offset(), scaleDown, true) + nSpans, nDeltas := convertBucketsLayout(p.Negative().BucketCounts().AsRaw(), p.Negative().Offset(), scaleDown, true) h := prompb.Histogram{ // The counter reset detection must be compatible with Prometheus to @@ -133,19 +133,25 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom return h, annots, nil } -// convertBucketsLayout translates OTel Exponential Histogram dense buckets -// representation to Prometheus Native Histogram sparse bucket representation. +// convertBucketsLayout translates OTel Explicit or Exponential Histogram dense buckets +// representation to Prometheus Native Histogram sparse bucket representation. This is used +// for translating Exponential Histograms into Native Histograms, and Explicit Histograms +// into Native Histograms with Custom Buckets. // // The translation logic is taken from the client_golang `histogram.go#makeBuckets` // function, see `makeBuckets` https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go -// The bucket indexes conversion was adjusted, since OTel exp. histogram bucket +// +// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one. +// +// When converting from OTel Exponential Histograms to Native Histograms, the +// bucket indexes conversion is adjusted, since OTel exp. histogram bucket // index 0 corresponds to the range (1, base] while Prometheus bucket index 0 // to the range (base 1]. // -// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one. -func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]prompb.BucketSpan, []int64) { - bucketCounts := buckets.BucketCounts() - if bucketCounts.Len() == 0 { +// When converting from OTel Explicit Histograms to Native Histograms with Custom Buckets, +// the bucket indexes are not scaled, and the indices are not adjusted by 1. +func convertBucketsLayout(bucketCounts []uint64, offset int32, scaleDown int32, adjustOffset bool) ([]prompb.BucketSpan, []int64) { + if len(bucketCounts) == 0 { return nil, nil } @@ -164,25 +170,28 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, // Let the compiler figure out that this is const during this function by // moving it into a local variable. - numBuckets := bucketCounts.Len() + numBuckets := len(bucketCounts) - // The offset is scaled and adjusted by 1 as described above. - bucketIdx := buckets.Offset()>>scaleDown + 1 + bucketIdx := offset>>scaleDown + 1 + + initialOffset := offset + if adjustOffset { + initialOffset = initialOffset>>scaleDown + 1 + } spans = append(spans, prompb.BucketSpan{ - Offset: bucketIdx, + Offset: initialOffset, Length: 0, }) for i := 0; i < numBuckets; i++ { - // The offset is scaled and adjusted by 1 as described above. - nextBucketIdx := (int32(i)+buckets.Offset())>>scaleDown + 1 + nextBucketIdx := (int32(i)+offset)>>scaleDown + 1 if bucketIdx == nextBucketIdx { // We have not collected enough buckets to merge yet. - count += int64(bucketCounts.At(i)) + count += int64(bucketCounts[i]) continue } if count == 0 { - count = int64(bucketCounts.At(i)) + count = int64(bucketCounts[i]) continue } @@ -203,11 +212,12 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, } } appendDelta(count) - count = int64(bucketCounts.At(i)) + count = int64(bucketCounts[i]) bucketIdx = nextBucketIdx } + // Need to use the last item's index. The offset is scaled and adjusted by 1 as described above. - gap := (int32(numBuckets)+buckets.Offset()-1)>>scaleDown + 1 - bucketIdx + gap := (int32(numBuckets)+offset-1)>>scaleDown + 1 - bucketIdx if gap > 2 { // We have to create a new span, because we have found a gap // of more than two buckets. The constant 2 is copied from the logic in @@ -239,7 +249,7 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co pt := dataPoints.At(x) - histogram, ws, err := histogramToCustomBucketsHistogram(pt) + histogram, ws, err := explicitHistogramToCustomBucketsHistogram(pt) annots.Merge(ws) if err != nil { return annots, err @@ -268,10 +278,13 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co return annots, nil } -func histogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) { +func explicitHistogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) { var annots annotations.Annotations - positiveSpans, positiveDeltas := convertHistogramBucketsToNHCBLayout(p.BucketCounts().AsRaw()) + buckets := p.BucketCounts().AsRaw() + offset := getBucketOffset(buckets) + bucketCounts := buckets[offset:] + positiveSpans, positiveDeltas := convertBucketsLayout(bucketCounts, int32(offset), 0, false) h := prompb.Histogram{ // The counter reset detection must be compatible with Prometheus to @@ -306,80 +319,11 @@ func histogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint) (prompb.His } } return h, annots, nil - } -func convertHistogramBucketsToNHCBLayout(buckets []uint64) ([]prompb.BucketSpan, []int64) { - if len(buckets) == 0 { - return nil, nil - } - - var ( - spans []prompb.BucketSpan - deltas []int64 - count int64 - prevCount int64 - ) - - appendDelta := func(count int64) { - spans[len(spans)-1].Length++ - deltas = append(deltas, count-prevCount) - prevCount = count - } - - var offset int +func getBucketOffset(buckets []uint64) (offset int) { for offset < len(buckets) && buckets[offset] == 0 { offset++ } - bucketCounts := buckets[offset:] - - bucketIdx := offset + 1 - spans = append(spans, prompb.BucketSpan{ - Offset: int32(offset), - Length: 0, - }) - - for i := 0; i < len(bucketCounts); i++ { - nextBucketIdx := offset + i + 1 - if bucketIdx == nextBucketIdx { // We have not collected enough buckets to merge yet. - count += int64(bucketCounts[i]) - continue - } - if count == 0 { - count = int64(bucketCounts[i]) - continue - } - - iDelta := nextBucketIdx - bucketIdx - 1 - - if iDelta > 2 { - spans = append(spans, prompb.BucketSpan{ - Offset: int32(iDelta), - Length: 0, - }) - } else { - // Either no gap or a gap <= 2 - for j := int32(0); j < int32(iDelta); j++ { - appendDelta(0) - } - } - appendDelta(count) - count = int64(bucketCounts[i]) - bucketIdx = nextBucketIdx - } - - iDelta := int32(len(bucketCounts)+offset-1) + 1 - int32(bucketIdx) - if iDelta > 2 { - spans = append(spans, prompb.BucketSpan{ - Offset: iDelta, - Length: 0, - }) - } else { - for j := int32(0); j < iDelta; j++ { - appendDelta(0) - } - } - appendDelta(count) - - return spans, deltas + return offset } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go index 83612c5ebd..bd352f666f 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go @@ -380,7 +380,7 @@ func TestConvertBucketsLayout(t *testing.T) { for _, tt := range tests { for scaleDown, wantLayout := range tt.wantLayout { t.Run(fmt.Sprintf("%s-scaleby-%d", tt.name, scaleDown), func(t *testing.T) { - gotSpans, gotDeltas := convertBucketsLayout(tt.buckets(), scaleDown) + gotSpans, gotDeltas := convertBucketsLayout(tt.buckets().BucketCounts().AsRaw(), tt.buckets().Offset(), scaleDown, true) assert.Equal(t, wantLayout.wantSpans, gotSpans) assert.Equal(t, wantLayout.wantDeltas, gotDeltas) }) @@ -410,7 +410,7 @@ func BenchmarkConvertBucketLayout(b *testing.B) { } b.Run(fmt.Sprintf("gap %d", scenario.gap), func(b *testing.B) { for i := 0; i < b.N; i++ { - convertBucketsLayout(buckets, 0) + convertBucketsLayout(buckets.BucketCounts().AsRaw(), buckets.Offset(), 0, true) } }) } @@ -781,7 +781,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) { } } -func TestConvertHistogramBucketsToNHCBLayout(t *testing.T) { +func TestConvertExplicitHistogramBucketsToNHCBLayout(t *testing.T) { tests := []struct { name string buckets []uint64 @@ -876,7 +876,12 @@ func TestConvertHistogramBucketsToNHCBLayout(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotSpans, gotDeltas := convertHistogramBucketsToNHCBLayout(tt.buckets) + + buckets := tt.buckets + offset := getBucketOffset(buckets) + bucketCounts := buckets[offset:] + + gotSpans, gotDeltas := convertBucketsLayout(bucketCounts, int32(offset), 0, false) assert.Equal(t, tt.wantLayout.wantSpans, gotSpans) assert.Equal(t, tt.wantLayout.wantDeltas, gotDeltas) }) @@ -904,7 +909,8 @@ func BenchmarkConvertHistogramBucketsToNHCBLayout(b *testing.B) { } b.Run(fmt.Sprintf("gap %d", scenario.gap), func(b *testing.B) { for i := 0; i < b.N; i++ { - convertHistogramBucketsToNHCBLayout(buckets) + offset := getBucketOffset(buckets) + convertBucketsLayout(buckets, int32(offset), 0, false) } }) } @@ -965,12 +971,11 @@ func TestHistogramToCustomBucketsHistogram(t *testing.T) { } }, }, - // TODO: add tests for error messages } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { validateHistogramCount(t, tt.hist()) - got, annots, err := histogramToCustomBucketsHistogram(tt.hist()) + got, annots, err := explicitHistogramToCustomBucketsHistogram(tt.hist()) if tt.wantErrMessage != "" { assert.ErrorContains(t, err, tt.wantErrMessage) return