Refactor to re-use bucket conversion function
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
This commit is contained in:
Carrie Edwards 2025-02-12 07:58:23 -08:00
parent 488b356da4
commit ec9c9f1383
2 changed files with 49 additions and 100 deletions

View file

@ -89,8 +89,8 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom
scale = 8 scale = 8
} }
pSpans, pDeltas := convertBucketsLayout(p.Positive(), scaleDown) pSpans, pDeltas := convertBucketsLayout(p.Positive().BucketCounts().AsRaw(), p.Positive().Offset(), scaleDown, true)
nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown) nSpans, nDeltas := convertBucketsLayout(p.Negative().BucketCounts().AsRaw(), p.Negative().Offset(), scaleDown, true)
h := prompb.Histogram{ h := prompb.Histogram{
// The counter reset detection must be compatible with Prometheus to // The counter reset detection must be compatible with Prometheus to
@ -133,19 +133,25 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom
return h, annots, nil return h, annots, nil
} }
// convertBucketsLayout translates OTel Exponential Histogram dense buckets // convertBucketsLayout translates OTel Explicit or Exponential Histogram dense buckets
// representation to Prometheus Native Histogram sparse bucket representation. // representation to Prometheus Native Histogram sparse bucket representation. This is used
// for translating Exponential Histograms into Native Histograms, and Explicit Histograms
// into Native Histograms with Custom Buckets.
// //
// The translation logic is taken from the client_golang `histogram.go#makeBuckets` // The translation logic is taken from the client_golang `histogram.go#makeBuckets`
// function, see `makeBuckets` https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go // function, see `makeBuckets` https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go
// The bucket indexes conversion was adjusted, since OTel exp. histogram bucket //
// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one.
//
// When converting from OTel Exponential Histograms to Native Histograms, the
// bucket indexes conversion is adjusted, since OTel exp. histogram bucket
// index 0 corresponds to the range (1, base] while Prometheus bucket index 0 // index 0 corresponds to the range (1, base] while Prometheus bucket index 0
// to the range (base 1]. // to the range (base 1].
// //
// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one. // When converting from OTel Explicit Histograms to Native Histograms with Custom Buckets,
func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]prompb.BucketSpan, []int64) { // the bucket indexes are not scaled, and the indices are not adjusted by 1.
bucketCounts := buckets.BucketCounts() func convertBucketsLayout(bucketCounts []uint64, offset int32, scaleDown int32, adjustOffset bool) ([]prompb.BucketSpan, []int64) {
if bucketCounts.Len() == 0 { if len(bucketCounts) == 0 {
return nil, nil return nil, nil
} }
@ -164,25 +170,28 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets,
// Let the compiler figure out that this is const during this function by // Let the compiler figure out that this is const during this function by
// moving it into a local variable. // moving it into a local variable.
numBuckets := bucketCounts.Len() numBuckets := len(bucketCounts)
// The offset is scaled and adjusted by 1 as described above. bucketIdx := offset>>scaleDown + 1
bucketIdx := buckets.Offset()>>scaleDown + 1
initialOffset := offset
if adjustOffset {
initialOffset = initialOffset>>scaleDown + 1
}
spans = append(spans, prompb.BucketSpan{ spans = append(spans, prompb.BucketSpan{
Offset: bucketIdx, Offset: initialOffset,
Length: 0, Length: 0,
}) })
for i := 0; i < numBuckets; i++ { for i := 0; i < numBuckets; i++ {
// The offset is scaled and adjusted by 1 as described above. nextBucketIdx := (int32(i)+offset)>>scaleDown + 1
nextBucketIdx := (int32(i)+buckets.Offset())>>scaleDown + 1
if bucketIdx == nextBucketIdx { // We have not collected enough buckets to merge yet. if bucketIdx == nextBucketIdx { // We have not collected enough buckets to merge yet.
count += int64(bucketCounts.At(i)) count += int64(bucketCounts[i])
continue continue
} }
if count == 0 { if count == 0 {
count = int64(bucketCounts.At(i)) count = int64(bucketCounts[i])
continue continue
} }
@ -203,11 +212,12 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets,
} }
} }
appendDelta(count) appendDelta(count)
count = int64(bucketCounts.At(i)) count = int64(bucketCounts[i])
bucketIdx = nextBucketIdx bucketIdx = nextBucketIdx
} }
// Need to use the last item's index. The offset is scaled and adjusted by 1 as described above. // Need to use the last item's index. The offset is scaled and adjusted by 1 as described above.
gap := (int32(numBuckets)+buckets.Offset()-1)>>scaleDown + 1 - bucketIdx gap := (int32(numBuckets)+offset-1)>>scaleDown + 1 - bucketIdx
if gap > 2 { if gap > 2 {
// We have to create a new span, because we have found a gap // We have to create a new span, because we have found a gap
// of more than two buckets. The constant 2 is copied from the logic in // of more than two buckets. The constant 2 is copied from the logic in
@ -239,7 +249,7 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co
pt := dataPoints.At(x) pt := dataPoints.At(x)
histogram, ws, err := histogramToCustomBucketsHistogram(pt) histogram, ws, err := explicitHistogramToCustomBucketsHistogram(pt)
annots.Merge(ws) annots.Merge(ws)
if err != nil { if err != nil {
return annots, err return annots, err
@ -268,10 +278,13 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co
return annots, nil return annots, nil
} }
func histogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) { func explicitHistogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) {
var annots annotations.Annotations var annots annotations.Annotations
positiveSpans, positiveDeltas := convertHistogramBucketsToNHCBLayout(p.BucketCounts().AsRaw()) buckets := p.BucketCounts().AsRaw()
offset := getBucketOffset(buckets)
bucketCounts := buckets[offset:]
positiveSpans, positiveDeltas := convertBucketsLayout(bucketCounts, int32(offset), 0, false)
h := prompb.Histogram{ h := prompb.Histogram{
// The counter reset detection must be compatible with Prometheus to // The counter reset detection must be compatible with Prometheus to
@ -306,80 +319,11 @@ func histogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint) (prompb.His
} }
} }
return h, annots, nil return h, annots, nil
} }
func convertHistogramBucketsToNHCBLayout(buckets []uint64) ([]prompb.BucketSpan, []int64) { func getBucketOffset(buckets []uint64) (offset int) {
if len(buckets) == 0 {
return nil, nil
}
var (
spans []prompb.BucketSpan
deltas []int64
count int64
prevCount int64
)
appendDelta := func(count int64) {
spans[len(spans)-1].Length++
deltas = append(deltas, count-prevCount)
prevCount = count
}
var offset int
for offset < len(buckets) && buckets[offset] == 0 { for offset < len(buckets) && buckets[offset] == 0 {
offset++ offset++
} }
bucketCounts := buckets[offset:] return offset
bucketIdx := offset + 1
spans = append(spans, prompb.BucketSpan{
Offset: int32(offset),
Length: 0,
})
for i := 0; i < len(bucketCounts); i++ {
nextBucketIdx := offset + i + 1
if bucketIdx == nextBucketIdx { // We have not collected enough buckets to merge yet.
count += int64(bucketCounts[i])
continue
}
if count == 0 {
count = int64(bucketCounts[i])
continue
}
iDelta := nextBucketIdx - bucketIdx - 1
if iDelta > 2 {
spans = append(spans, prompb.BucketSpan{
Offset: int32(iDelta),
Length: 0,
})
} else {
// Either no gap or a gap <= 2
for j := int32(0); j < int32(iDelta); j++ {
appendDelta(0)
}
}
appendDelta(count)
count = int64(bucketCounts[i])
bucketIdx = nextBucketIdx
}
iDelta := int32(len(bucketCounts)+offset-1) + 1 - int32(bucketIdx)
if iDelta > 2 {
spans = append(spans, prompb.BucketSpan{
Offset: iDelta,
Length: 0,
})
} else {
for j := int32(0); j < iDelta; j++ {
appendDelta(0)
}
}
appendDelta(count)
return spans, deltas
} }

View file

@ -380,7 +380,7 @@ func TestConvertBucketsLayout(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
for scaleDown, wantLayout := range tt.wantLayout { for scaleDown, wantLayout := range tt.wantLayout {
t.Run(fmt.Sprintf("%s-scaleby-%d", tt.name, scaleDown), func(t *testing.T) { t.Run(fmt.Sprintf("%s-scaleby-%d", tt.name, scaleDown), func(t *testing.T) {
gotSpans, gotDeltas := convertBucketsLayout(tt.buckets(), scaleDown) gotSpans, gotDeltas := convertBucketsLayout(tt.buckets().BucketCounts().AsRaw(), tt.buckets().Offset(), scaleDown, true)
assert.Equal(t, wantLayout.wantSpans, gotSpans) assert.Equal(t, wantLayout.wantSpans, gotSpans)
assert.Equal(t, wantLayout.wantDeltas, gotDeltas) assert.Equal(t, wantLayout.wantDeltas, gotDeltas)
}) })
@ -410,7 +410,7 @@ func BenchmarkConvertBucketLayout(b *testing.B) {
} }
b.Run(fmt.Sprintf("gap %d", scenario.gap), func(b *testing.B) { b.Run(fmt.Sprintf("gap %d", scenario.gap), func(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
convertBucketsLayout(buckets, 0) convertBucketsLayout(buckets.BucketCounts().AsRaw(), buckets.Offset(), 0, true)
} }
}) })
} }
@ -781,7 +781,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) {
} }
} }
func TestConvertHistogramBucketsToNHCBLayout(t *testing.T) { func TestConvertExplicitHistogramBucketsToNHCBLayout(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
buckets []uint64 buckets []uint64
@ -876,7 +876,12 @@ func TestConvertHistogramBucketsToNHCBLayout(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
gotSpans, gotDeltas := convertHistogramBucketsToNHCBLayout(tt.buckets)
buckets := tt.buckets
offset := getBucketOffset(buckets)
bucketCounts := buckets[offset:]
gotSpans, gotDeltas := convertBucketsLayout(bucketCounts, int32(offset), 0, false)
assert.Equal(t, tt.wantLayout.wantSpans, gotSpans) assert.Equal(t, tt.wantLayout.wantSpans, gotSpans)
assert.Equal(t, tt.wantLayout.wantDeltas, gotDeltas) assert.Equal(t, tt.wantLayout.wantDeltas, gotDeltas)
}) })
@ -904,7 +909,8 @@ func BenchmarkConvertHistogramBucketsToNHCBLayout(b *testing.B) {
} }
b.Run(fmt.Sprintf("gap %d", scenario.gap), func(b *testing.B) { b.Run(fmt.Sprintf("gap %d", scenario.gap), func(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
convertHistogramBucketsToNHCBLayout(buckets) offset := getBucketOffset(buckets)
convertBucketsLayout(buckets, int32(offset), 0, false)
} }
}) })
} }
@ -965,12 +971,11 @@ func TestHistogramToCustomBucketsHistogram(t *testing.T) {
} }
}, },
}, },
// TODO: add tests for error messages
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
validateHistogramCount(t, tt.hist()) validateHistogramCount(t, tt.hist())
got, annots, err := histogramToCustomBucketsHistogram(tt.hist()) got, annots, err := explicitHistogramToCustomBucketsHistogram(tt.hist())
if tt.wantErrMessage != "" { if tt.wantErrMessage != "" {
assert.ErrorContains(t, err, tt.wantErrMessage) assert.ErrorContains(t, err, tt.wantErrMessage)
return return