mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-14 01:24:04 -08:00
Merge pull request #15220 from prometheus/nhcb-scrape-optimize
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
perf(nhcb): scrape optimize
This commit is contained in:
commit
76432aaf4e
|
@ -283,18 +283,18 @@ func (p *NHCBParser) handleClassicHistogramSeries(lset labels.Labels) bool {
|
||||||
le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64)
|
le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64)
|
||||||
if err == nil && !math.IsNaN(le) {
|
if err == nil && !math.IsNaN(le) {
|
||||||
p.processClassicHistogramSeries(lset, "_bucket", func(hist *convertnhcb.TempHistogram) {
|
p.processClassicHistogramSeries(lset, "_bucket", func(hist *convertnhcb.TempHistogram) {
|
||||||
hist.BucketCounts[le] = p.value
|
_ = hist.SetBucketCount(le, p.value)
|
||||||
})
|
})
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
case strings.HasSuffix(mName, "_count"):
|
case strings.HasSuffix(mName, "_count"):
|
||||||
p.processClassicHistogramSeries(lset, "_count", func(hist *convertnhcb.TempHistogram) {
|
p.processClassicHistogramSeries(lset, "_count", func(hist *convertnhcb.TempHistogram) {
|
||||||
hist.Count = p.value
|
_ = hist.SetCount(p.value)
|
||||||
})
|
})
|
||||||
return true
|
return true
|
||||||
case strings.HasSuffix(mName, "_sum"):
|
case strings.HasSuffix(mName, "_sum"):
|
||||||
p.processClassicHistogramSeries(lset, "_sum", func(hist *convertnhcb.TempHistogram) {
|
p.processClassicHistogramSeries(lset, "_sum", func(hist *convertnhcb.TempHistogram) {
|
||||||
hist.Sum = p.value
|
_ = hist.SetSum(p.value)
|
||||||
})
|
})
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -306,8 +306,8 @@ func (p *NHCBParser) processClassicHistogramSeries(lset labels.Labels, suffix st
|
||||||
p.storeClassicLabels()
|
p.storeClassicLabels()
|
||||||
p.tempCT = p.parser.CreatedTimestamp()
|
p.tempCT = p.parser.CreatedTimestamp()
|
||||||
p.state = stateCollecting
|
p.state = stateCollecting
|
||||||
}
|
|
||||||
p.tempLsetNHCB = convertnhcb.GetHistogramMetricBase(lset, suffix)
|
p.tempLsetNHCB = convertnhcb.GetHistogramMetricBase(lset, suffix)
|
||||||
|
}
|
||||||
p.storeExemplars()
|
p.storeExemplars()
|
||||||
updateHist(&p.tempNHCB)
|
updateHist(&p.tempNHCB)
|
||||||
}
|
}
|
||||||
|
@ -335,7 +335,6 @@ func (p *NHCBParser) nextExemplarPtr() *exemplar.Exemplar {
|
||||||
func (p *NHCBParser) swapExemplars() {
|
func (p *NHCBParser) swapExemplars() {
|
||||||
p.exemplars = p.tempExemplars[:p.tempExemplarCount]
|
p.exemplars = p.tempExemplars[:p.tempExemplarCount]
|
||||||
p.tempExemplars = p.tempExemplars[:0]
|
p.tempExemplars = p.tempExemplars[:0]
|
||||||
p.tempExemplarCount = 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// processNHCB converts the collated classic histogram series to NHCB and caches the info
|
// processNHCB converts the collated classic histogram series to NHCB and caches the info
|
||||||
|
@ -344,13 +343,8 @@ func (p *NHCBParser) processNHCB() bool {
|
||||||
if p.state != stateCollecting {
|
if p.state != stateCollecting {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
ub := make([]float64, 0, len(p.tempNHCB.BucketCounts))
|
h, fh, err := p.tempNHCB.Convert()
|
||||||
for b := range p.tempNHCB.BucketCounts {
|
if err == nil {
|
||||||
ub = append(ub, b)
|
|
||||||
}
|
|
||||||
upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub, false)
|
|
||||||
fhBase := hBase.ToFloat(nil)
|
|
||||||
h, fh := convertnhcb.NewHistogram(p.tempNHCB, upperBounds, hBase, fhBase)
|
|
||||||
if h != nil {
|
if h != nil {
|
||||||
if err := h.Validate(); err != nil {
|
if err := h.Validate(); err != nil {
|
||||||
return false
|
return false
|
||||||
|
@ -369,8 +363,12 @@ func (p *NHCBParser) processNHCB() bool {
|
||||||
p.lsetNHCB = p.tempLsetNHCB
|
p.lsetNHCB = p.tempLsetNHCB
|
||||||
p.swapExemplars()
|
p.swapExemplars()
|
||||||
p.ctNHCB = p.tempCT
|
p.ctNHCB = p.tempCT
|
||||||
p.tempNHCB = convertnhcb.NewTempHistogram()
|
|
||||||
p.state = stateEmitting
|
p.state = stateEmitting
|
||||||
p.tempCT = nil
|
} else {
|
||||||
return true
|
p.state = stateStart
|
||||||
|
}
|
||||||
|
p.tempNHCB.Reset()
|
||||||
|
p.tempExemplarCount = 0
|
||||||
|
p.tempCT = nil
|
||||||
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -500,8 +500,8 @@ something_bucket{a="b",le="+Inf"} 9 # {id="something-test"} 2e100 123.000
|
||||||
Schema: histogram.CustomBucketsSchema,
|
Schema: histogram.CustomBucketsSchema,
|
||||||
Count: 9,
|
Count: 9,
|
||||||
Sum: 42123.0,
|
Sum: 42123.0,
|
||||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}, {Offset: 1, Length: 1}},
|
PositiveSpans: []histogram.Span{{Offset: 0, Length: 3}},
|
||||||
PositiveBuckets: []int64{8, -7},
|
PositiveBuckets: []int64{8, -8, 1},
|
||||||
CustomValues: []float64{0.0, 1.0}, // We do not store the +Inf boundary.
|
CustomValues: []float64{0.0, 1.0}, // We do not store the +Inf boundary.
|
||||||
},
|
},
|
||||||
lset: labels.FromStrings("__name__", "something", "a", "b"),
|
lset: labels.FromStrings("__name__", "something", "a", "b"),
|
||||||
|
@ -937,3 +937,48 @@ test_histogram1_bucket{le="-0.0003899999999999998"} 4 1234568
|
||||||
test_histogram1_bucket{le="-0.0002899999999999998"} 16 1234568
|
test_histogram1_bucket{le="-0.0002899999999999998"} 16 1234568
|
||||||
test_histogram1_bucket{le="+Inf"} 175 1234568`
|
test_histogram1_bucket{le="+Inf"} 175 1234568`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNHCBParserErrorHandling(t *testing.T) {
|
||||||
|
input := `# HELP something Histogram with non cumulative buckets
|
||||||
|
# TYPE something histogram
|
||||||
|
something_count 18
|
||||||
|
something_sum 324789.4
|
||||||
|
something_created 1520430001
|
||||||
|
something_bucket{le="0.0"} 18
|
||||||
|
something_bucket{le="+Inf"} 1
|
||||||
|
something_count{a="b"} 9
|
||||||
|
something_sum{a="b"} 42123
|
||||||
|
something_created{a="b"} 1520430002
|
||||||
|
something_bucket{a="b",le="0.0"} 1
|
||||||
|
something_bucket{a="b",le="+Inf"} 9
|
||||||
|
# EOF`
|
||||||
|
exp := []parsedEntry{
|
||||||
|
{
|
||||||
|
m: "something",
|
||||||
|
help: "Histogram with non cumulative buckets",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
m: "something",
|
||||||
|
typ: model.MetricTypeHistogram,
|
||||||
|
},
|
||||||
|
// The parser should skip the series with non-cumulative buckets.
|
||||||
|
{
|
||||||
|
m: `something{a="b"}`,
|
||||||
|
shs: &histogram.Histogram{
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
Count: 9,
|
||||||
|
Sum: 42123.0,
|
||||||
|
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}},
|
||||||
|
PositiveBuckets: []int64{1, 7},
|
||||||
|
CustomValues: []float64{0.0}, // We do not store the +Inf boundary.
|
||||||
|
},
|
||||||
|
lset: labels.FromStrings("__name__", "something", "a", "b"),
|
||||||
|
ct: int64p(1520430002000),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
|
||||||
|
p = NewNHCBParser(p, labels.NewSymbolTable(), false)
|
||||||
|
got := testParse(t, p)
|
||||||
|
requireEntries(t, exp, got)
|
||||||
|
}
|
||||||
|
|
|
@ -482,18 +482,16 @@ func (cmd *loadCmd) append(a storage.Appender) error {
|
||||||
|
|
||||||
type tempHistogramWrapper struct {
|
type tempHistogramWrapper struct {
|
||||||
metric labels.Labels
|
metric labels.Labels
|
||||||
upperBounds []float64
|
|
||||||
histogramByTs map[int64]convertnhcb.TempHistogram
|
histogramByTs map[int64]convertnhcb.TempHistogram
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTempHistogramWrapper() tempHistogramWrapper {
|
func newTempHistogramWrapper() tempHistogramWrapper {
|
||||||
return tempHistogramWrapper{
|
return tempHistogramWrapper{
|
||||||
upperBounds: []float64{},
|
|
||||||
histogramByTs: map[int64]convertnhcb.TempHistogram{},
|
histogramByTs: map[int64]convertnhcb.TempHistogram{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap map[uint64]tempHistogramWrapper, smpls []promql.Sample, updateHistogramWrapper func(*tempHistogramWrapper), updateHistogram func(*convertnhcb.TempHistogram, float64)) {
|
func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap map[uint64]tempHistogramWrapper, smpls []promql.Sample, updateHistogram func(*convertnhcb.TempHistogram, float64)) {
|
||||||
m2 := convertnhcb.GetHistogramMetricBase(m, suffix)
|
m2 := convertnhcb.GetHistogramMetricBase(m, suffix)
|
||||||
m2hash := m2.Hash()
|
m2hash := m2.Hash()
|
||||||
histogramWrapper, exists := histogramMap[m2hash]
|
histogramWrapper, exists := histogramMap[m2hash]
|
||||||
|
@ -501,9 +499,6 @@ func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap
|
||||||
histogramWrapper = newTempHistogramWrapper()
|
histogramWrapper = newTempHistogramWrapper()
|
||||||
}
|
}
|
||||||
histogramWrapper.metric = m2
|
histogramWrapper.metric = m2
|
||||||
if updateHistogramWrapper != nil {
|
|
||||||
updateHistogramWrapper(&histogramWrapper)
|
|
||||||
}
|
|
||||||
for _, s := range smpls {
|
for _, s := range smpls {
|
||||||
if s.H != nil {
|
if s.H != nil {
|
||||||
continue
|
continue
|
||||||
|
@ -534,18 +529,16 @@ func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
|
||||||
if err != nil || math.IsNaN(le) {
|
if err != nil || math.IsNaN(le) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
processClassicHistogramSeries(m, "_bucket", histogramMap, smpls, func(histogramWrapper *tempHistogramWrapper) {
|
processClassicHistogramSeries(m, "_bucket", histogramMap, smpls, func(histogram *convertnhcb.TempHistogram, f float64) {
|
||||||
histogramWrapper.upperBounds = append(histogramWrapper.upperBounds, le)
|
_ = histogram.SetBucketCount(le, f)
|
||||||
}, func(histogram *convertnhcb.TempHistogram, f float64) {
|
|
||||||
histogram.BucketCounts[le] = f
|
|
||||||
})
|
})
|
||||||
case strings.HasSuffix(mName, "_count"):
|
case strings.HasSuffix(mName, "_count"):
|
||||||
processClassicHistogramSeries(m, "_count", histogramMap, smpls, nil, func(histogram *convertnhcb.TempHistogram, f float64) {
|
processClassicHistogramSeries(m, "_count", histogramMap, smpls, func(histogram *convertnhcb.TempHistogram, f float64) {
|
||||||
histogram.Count = f
|
_ = histogram.SetCount(f)
|
||||||
})
|
})
|
||||||
case strings.HasSuffix(mName, "_sum"):
|
case strings.HasSuffix(mName, "_sum"):
|
||||||
processClassicHistogramSeries(m, "_sum", histogramMap, smpls, nil, func(histogram *convertnhcb.TempHistogram, f float64) {
|
processClassicHistogramSeries(m, "_sum", histogramMap, smpls, func(histogram *convertnhcb.TempHistogram, f float64) {
|
||||||
histogram.Sum = f
|
_ = histogram.SetSum(f)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -553,11 +546,12 @@ func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
|
||||||
// Convert the collated classic histogram data into native histograms
|
// Convert the collated classic histogram data into native histograms
|
||||||
// with custom bounds and append them to the storage.
|
// with custom bounds and append them to the storage.
|
||||||
for _, histogramWrapper := range histogramMap {
|
for _, histogramWrapper := range histogramMap {
|
||||||
upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(histogramWrapper.upperBounds, true)
|
|
||||||
fhBase := hBase.ToFloat(nil)
|
|
||||||
samples := make([]promql.Sample, 0, len(histogramWrapper.histogramByTs))
|
samples := make([]promql.Sample, 0, len(histogramWrapper.histogramByTs))
|
||||||
for t, histogram := range histogramWrapper.histogramByTs {
|
for t, histogram := range histogramWrapper.histogramByTs {
|
||||||
h, fh := convertnhcb.NewHistogram(histogram, upperBounds, hBase, fhBase)
|
h, fh, err := histogram.Convert()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if fh == nil {
|
if fh == nil {
|
||||||
if err := h.Validate(); err != nil {
|
if err := h.Validate(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
package convertnhcb
|
package convertnhcb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -23,129 +24,212 @@ import (
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errNegativeBucketCount = errors.New("bucket count must be non-negative")
|
||||||
|
errNegativeCount = errors.New("count must be non-negative")
|
||||||
|
errCountMismatch = errors.New("count mismatch")
|
||||||
|
errCountNotCumulative = errors.New("count is not cumulative")
|
||||||
|
)
|
||||||
|
|
||||||
|
type tempHistogramBucket struct {
|
||||||
|
le float64
|
||||||
|
count float64
|
||||||
|
}
|
||||||
|
|
||||||
// TempHistogram is used to collect information about classic histogram
|
// TempHistogram is used to collect information about classic histogram
|
||||||
// samples incrementally before creating a histogram.Histogram or
|
// samples incrementally before creating a histogram.Histogram or
|
||||||
// histogram.FloatHistogram based on the values collected.
|
// histogram.FloatHistogram based on the values collected.
|
||||||
type TempHistogram struct {
|
type TempHistogram struct {
|
||||||
BucketCounts map[float64]float64
|
buckets []tempHistogramBucket
|
||||||
Count float64
|
count float64
|
||||||
Sum float64
|
sum float64
|
||||||
HasFloat bool
|
err error
|
||||||
|
hasCount bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTempHistogram creates a new TempHistogram to
|
// NewTempHistogram creates a new TempHistogram to
|
||||||
// collect information about classic histogram samples.
|
// collect information about classic histogram samples.
|
||||||
func NewTempHistogram() TempHistogram {
|
func NewTempHistogram() TempHistogram {
|
||||||
return TempHistogram{
|
return TempHistogram{
|
||||||
BucketCounts: map[float64]float64{},
|
buckets: make([]tempHistogramBucket, 0, 10),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h TempHistogram) getIntBucketCounts() (map[float64]int64, error) {
|
func (h TempHistogram) Err() error {
|
||||||
bucketCounts := map[float64]int64{}
|
return h.err
|
||||||
for le, count := range h.BucketCounts {
|
|
||||||
intCount := int64(math.Round(count))
|
|
||||||
if float64(intCount) != count {
|
|
||||||
return nil, fmt.Errorf("bucket count %f for le %g is not an integer", count, le)
|
|
||||||
}
|
|
||||||
bucketCounts[le] = intCount
|
|
||||||
}
|
|
||||||
return bucketCounts, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessUpperBoundsAndCreateBaseHistogram prepares an integer native
|
func (h *TempHistogram) Reset() {
|
||||||
// histogram with custom buckets based on the provided upper bounds.
|
h.buckets = h.buckets[:0]
|
||||||
// Everything is set except the bucket counts.
|
h.count = 0
|
||||||
// The sorted upper bounds are also returned.
|
h.sum = 0
|
||||||
func ProcessUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64, needsDedup bool) ([]float64, *histogram.Histogram) {
|
h.err = nil
|
||||||
sort.Float64s(upperBounds0)
|
h.hasCount = false
|
||||||
var upperBounds []float64
|
}
|
||||||
if needsDedup {
|
|
||||||
upperBounds = make([]float64, 0, len(upperBounds0))
|
func (h *TempHistogram) SetBucketCount(boundary, count float64) error {
|
||||||
prevLE := math.Inf(-1)
|
if h.err != nil {
|
||||||
for _, le := range upperBounds0 {
|
return h.err
|
||||||
if le != prevLE {
|
}
|
||||||
upperBounds = append(upperBounds, le)
|
if count < 0 {
|
||||||
prevLE = le
|
h.err = fmt.Errorf("%w: le=%g, count=%g", errNegativeBucketCount, boundary, count)
|
||||||
|
return h.err
|
||||||
|
}
|
||||||
|
// Assume that the elements are added in order.
|
||||||
|
switch {
|
||||||
|
case len(h.buckets) == 0:
|
||||||
|
h.buckets = append(h.buckets, tempHistogramBucket{le: boundary, count: count})
|
||||||
|
case h.buckets[len(h.buckets)-1].le < boundary:
|
||||||
|
// Happy case is "<".
|
||||||
|
if count < h.buckets[len(h.buckets)-1].count {
|
||||||
|
h.err = fmt.Errorf("%w: %g < %g", errCountNotCumulative, count, h.buckets[len(h.buckets)-1].count)
|
||||||
|
return h.err
|
||||||
|
}
|
||||||
|
h.buckets = append(h.buckets, tempHistogramBucket{le: boundary, count: count})
|
||||||
|
case h.buckets[len(h.buckets)-1].le == boundary:
|
||||||
|
// Ignore this, as it is a duplicate sample.
|
||||||
|
default:
|
||||||
|
// Find the correct position to insert.
|
||||||
|
i := sort.Search(len(h.buckets), func(i int) bool {
|
||||||
|
return h.buckets[i].le >= boundary
|
||||||
|
})
|
||||||
|
if h.buckets[i].le == boundary {
|
||||||
|
// Ignore this, as it is a duplicate sample.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if i > 0 && count < h.buckets[i-1].count {
|
||||||
|
h.err = fmt.Errorf("%w: %g < %g", errCountNotCumulative, count, h.buckets[i-1].count)
|
||||||
|
return h.err
|
||||||
|
}
|
||||||
|
if count > h.buckets[i].count {
|
||||||
|
h.err = fmt.Errorf("%w: %g > %g", errCountNotCumulative, count, h.buckets[i].count)
|
||||||
|
return h.err
|
||||||
|
}
|
||||||
|
// Insert at the correct position unless duplicate.
|
||||||
|
h.buckets = append(h.buckets, tempHistogramBucket{})
|
||||||
|
copy(h.buckets[i+1:], h.buckets[i:])
|
||||||
|
h.buckets[i] = tempHistogramBucket{le: boundary, count: count}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *TempHistogram) SetCount(count float64) error {
|
||||||
|
if h.err != nil {
|
||||||
|
return h.err
|
||||||
|
}
|
||||||
|
if count < 0 {
|
||||||
|
h.err = fmt.Errorf("%w: count=%g", errNegativeCount, count)
|
||||||
|
return h.err
|
||||||
|
}
|
||||||
|
h.count = count
|
||||||
|
h.hasCount = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *TempHistogram) SetSum(sum float64) error {
|
||||||
|
if h.err != nil {
|
||||||
|
return h.err
|
||||||
|
}
|
||||||
|
h.sum = sum
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h TempHistogram) Convert() (*histogram.Histogram, *histogram.FloatHistogram, error) {
|
||||||
|
if h.err != nil {
|
||||||
|
return nil, nil, h.err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(h.buckets) == 0 || h.buckets[len(h.buckets)-1].le != math.Inf(1) {
|
||||||
|
// No +Inf bucket.
|
||||||
|
if !h.hasCount && len(h.buckets) > 0 {
|
||||||
|
// No count either, so set count to the last known bucket's count.
|
||||||
|
h.count = h.buckets[len(h.buckets)-1].count
|
||||||
|
}
|
||||||
|
// Let the last bucket be +Inf with the overall count.
|
||||||
|
h.buckets = append(h.buckets, tempHistogramBucket{le: math.Inf(1), count: h.count})
|
||||||
|
}
|
||||||
|
|
||||||
|
if !h.hasCount {
|
||||||
|
h.count = h.buckets[len(h.buckets)-1].count
|
||||||
|
h.hasCount = true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, b := range h.buckets {
|
||||||
|
intCount := int64(math.Round(b.count))
|
||||||
|
if b.count != float64(intCount) {
|
||||||
|
return h.convertToFloatHistogram()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
upperBounds = upperBounds0
|
intCount := uint64(math.Round(h.count))
|
||||||
|
if h.count != float64(intCount) {
|
||||||
|
return h.convertToFloatHistogram()
|
||||||
}
|
}
|
||||||
var customBounds []float64
|
return h.convertToIntegerHistogram(intCount)
|
||||||
if upperBounds[len(upperBounds)-1] == math.Inf(1) {
|
|
||||||
customBounds = upperBounds[:len(upperBounds)-1]
|
|
||||||
} else {
|
|
||||||
customBounds = upperBounds
|
|
||||||
}
|
}
|
||||||
return upperBounds, &histogram.Histogram{
|
|
||||||
Count: 0,
|
func (h TempHistogram) convertToIntegerHistogram(count uint64) (*histogram.Histogram, *histogram.FloatHistogram, error) {
|
||||||
Sum: 0,
|
rh := &histogram.Histogram{
|
||||||
Schema: histogram.CustomBucketsSchema,
|
Schema: histogram.CustomBucketsSchema,
|
||||||
PositiveSpans: []histogram.Span{
|
Count: count,
|
||||||
{Offset: 0, Length: uint32(len(upperBounds))},
|
Sum: h.sum,
|
||||||
},
|
PositiveSpans: []histogram.Span{{Length: uint32(len(h.buckets))}},
|
||||||
PositiveBuckets: make([]int64, len(upperBounds)),
|
PositiveBuckets: make([]int64, len(h.buckets)),
|
||||||
CustomValues: customBounds,
|
}
|
||||||
|
|
||||||
|
if len(h.buckets) > 1 {
|
||||||
|
rh.CustomValues = make([]float64, len(h.buckets)-1) // Not storing the last +Inf bucket.
|
||||||
|
}
|
||||||
|
|
||||||
|
prevCount := int64(0)
|
||||||
|
prevDelta := int64(0)
|
||||||
|
for i, b := range h.buckets {
|
||||||
|
// delta is the actual bucket count as the input is cumulative.
|
||||||
|
delta := int64(b.count) - prevCount
|
||||||
|
rh.PositiveBuckets[i] = delta - prevDelta
|
||||||
|
prevCount = int64(b.count)
|
||||||
|
prevDelta = delta
|
||||||
|
if b.le != math.Inf(1) {
|
||||||
|
rh.CustomValues[i] = b.le
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHistogram fills the bucket counts in the provided histogram.Histogram
|
if count != uint64(h.buckets[len(h.buckets)-1].count) {
|
||||||
// or histogram.FloatHistogram based on the provided temporary histogram and
|
h.err = fmt.Errorf("%w: count=%d != le=%g count=%g", errCountMismatch, count, h.buckets[len(h.buckets)-1].le, h.buckets[len(h.buckets)-1].count)
|
||||||
// upper bounds.
|
return nil, nil, h.err
|
||||||
func NewHistogram(histogram TempHistogram, upperBounds []float64, hBase *histogram.Histogram, fhBase *histogram.FloatHistogram) (*histogram.Histogram, *histogram.FloatHistogram) {
|
|
||||||
intBucketCounts, err := histogram.getIntBucketCounts()
|
|
||||||
if err != nil {
|
|
||||||
return nil, newFloatHistogram(histogram, upperBounds, histogram.BucketCounts, fhBase)
|
|
||||||
}
|
|
||||||
return newIntegerHistogram(histogram, upperBounds, intBucketCounts, hBase), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newIntegerHistogram(histogram TempHistogram, upperBounds []float64, bucketCounts map[float64]int64, hBase *histogram.Histogram) *histogram.Histogram {
|
return rh.Compact(2), nil, nil
|
||||||
h := hBase.Copy()
|
|
||||||
absBucketCounts := make([]int64, len(h.PositiveBuckets))
|
|
||||||
var prevCount, total int64
|
|
||||||
for i, le := range upperBounds {
|
|
||||||
currCount, exists := bucketCounts[le]
|
|
||||||
if !exists {
|
|
||||||
currCount = 0
|
|
||||||
}
|
|
||||||
count := currCount - prevCount
|
|
||||||
absBucketCounts[i] = count
|
|
||||||
total += count
|
|
||||||
prevCount = currCount
|
|
||||||
}
|
|
||||||
h.PositiveBuckets[0] = absBucketCounts[0]
|
|
||||||
for i := 1; i < len(h.PositiveBuckets); i++ {
|
|
||||||
h.PositiveBuckets[i] = absBucketCounts[i] - absBucketCounts[i-1]
|
|
||||||
}
|
|
||||||
h.Sum = histogram.Sum
|
|
||||||
if histogram.Count != 0 {
|
|
||||||
total = int64(histogram.Count)
|
|
||||||
}
|
|
||||||
h.Count = uint64(total)
|
|
||||||
return h.Compact(0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFloatHistogram(histogram TempHistogram, upperBounds []float64, bucketCounts map[float64]float64, fhBase *histogram.FloatHistogram) *histogram.FloatHistogram {
|
func (h TempHistogram) convertToFloatHistogram() (*histogram.Histogram, *histogram.FloatHistogram, error) {
|
||||||
fh := fhBase.Copy()
|
rh := &histogram.FloatHistogram{
|
||||||
var prevCount, total float64
|
Schema: histogram.CustomBucketsSchema,
|
||||||
for i, le := range upperBounds {
|
Count: h.count,
|
||||||
currCount, exists := bucketCounts[le]
|
Sum: h.sum,
|
||||||
if !exists {
|
PositiveSpans: []histogram.Span{{Length: uint32(len(h.buckets))}},
|
||||||
currCount = 0
|
PositiveBuckets: make([]float64, len(h.buckets)),
|
||||||
}
|
}
|
||||||
count := currCount - prevCount
|
|
||||||
fh.PositiveBuckets[i] = count
|
if len(h.buckets) > 1 {
|
||||||
total += count
|
rh.CustomValues = make([]float64, len(h.buckets)-1) // Not storing the last +Inf bucket.
|
||||||
prevCount = currCount
|
|
||||||
}
|
}
|
||||||
fh.Sum = histogram.Sum
|
|
||||||
if histogram.Count != 0 {
|
prevCount := 0.0
|
||||||
total = histogram.Count
|
for i, b := range h.buckets {
|
||||||
|
rh.PositiveBuckets[i] = b.count - prevCount
|
||||||
|
prevCount = b.count
|
||||||
|
if b.le != math.Inf(1) {
|
||||||
|
rh.CustomValues[i] = b.le
|
||||||
}
|
}
|
||||||
fh.Count = total
|
}
|
||||||
return fh.Compact(0)
|
|
||||||
|
if h.count != h.buckets[len(h.buckets)-1].count {
|
||||||
|
h.err = fmt.Errorf("%w: count=%g != le=%g count=%g", errCountMismatch, h.count, h.buckets[len(h.buckets)-1].le, h.buckets[len(h.buckets)-1].count)
|
||||||
|
return nil, nil, h.err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, rh.Compact(0), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetHistogramMetricBase(m labels.Labels, suffix string) labels.Labels {
|
func GetHistogramMetricBase(m labels.Labels, suffix string) labels.Labels {
|
||||||
|
|
189
util/convertnhcb/convertnhcb_test.go
Normal file
189
util/convertnhcb/convertnhcb_test.go
Normal file
|
@ -0,0 +1,189 @@
|
||||||
|
// Copyright 2024 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package convertnhcb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNHCBConvert(t *testing.T) {
|
||||||
|
tests := map[string]struct {
|
||||||
|
setup func() *TempHistogram
|
||||||
|
expectedErr error
|
||||||
|
expectedH *histogram.Histogram
|
||||||
|
expectedFH *histogram.FloatHistogram
|
||||||
|
}{
|
||||||
|
"empty": {
|
||||||
|
setup: func() *TempHistogram {
|
||||||
|
h := NewTempHistogram()
|
||||||
|
return &h
|
||||||
|
},
|
||||||
|
expectedH: &histogram.Histogram{
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
PositiveSpans: []histogram.Span{},
|
||||||
|
PositiveBuckets: []int64{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"sum only": {
|
||||||
|
setup: func() *TempHistogram {
|
||||||
|
h := NewTempHistogram()
|
||||||
|
h.SetSum(1000.25)
|
||||||
|
return &h
|
||||||
|
},
|
||||||
|
expectedH: &histogram.Histogram{
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
Sum: 1000.25,
|
||||||
|
PositiveSpans: []histogram.Span{},
|
||||||
|
PositiveBuckets: []int64{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"single integer bucket": {
|
||||||
|
setup: func() *TempHistogram {
|
||||||
|
h := NewTempHistogram()
|
||||||
|
h.SetSum(1000.25)
|
||||||
|
h.SetBucketCount(0.5, 1000)
|
||||||
|
return &h
|
||||||
|
},
|
||||||
|
expectedH: &histogram.Histogram{
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
Count: 1000,
|
||||||
|
Sum: 1000.25,
|
||||||
|
PositiveSpans: []histogram.Span{{Length: 1}},
|
||||||
|
PositiveBuckets: []int64{1000},
|
||||||
|
CustomValues: []float64{0.5},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"single float bucket": {
|
||||||
|
setup: func() *TempHistogram {
|
||||||
|
h := NewTempHistogram()
|
||||||
|
h.SetSum(1000.25)
|
||||||
|
h.SetBucketCount(0.5, 1337.42)
|
||||||
|
return &h
|
||||||
|
},
|
||||||
|
expectedFH: &histogram.FloatHistogram{
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
Count: 1337.42,
|
||||||
|
Sum: 1000.25,
|
||||||
|
PositiveSpans: []histogram.Span{{Length: 1}},
|
||||||
|
PositiveBuckets: []float64{1337.42},
|
||||||
|
CustomValues: []float64{0.5},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"happy case integer bucket": {
|
||||||
|
setup: func() *TempHistogram {
|
||||||
|
h := NewTempHistogram()
|
||||||
|
h.SetCount(1000)
|
||||||
|
h.SetSum(1000.25)
|
||||||
|
h.SetBucketCount(0.5, 50)
|
||||||
|
h.SetBucketCount(1.0, 950)
|
||||||
|
h.SetBucketCount(math.Inf(1), 1000)
|
||||||
|
return &h
|
||||||
|
},
|
||||||
|
expectedH: &histogram.Histogram{
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
Count: 1000,
|
||||||
|
Sum: 1000.25,
|
||||||
|
PositiveSpans: []histogram.Span{{Length: 3}},
|
||||||
|
PositiveBuckets: []int64{50, 850, -850},
|
||||||
|
CustomValues: []float64{0.5, 1.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"happy case float bucket": {
|
||||||
|
setup: func() *TempHistogram {
|
||||||
|
h := NewTempHistogram()
|
||||||
|
h.SetCount(1000)
|
||||||
|
h.SetSum(1000.25)
|
||||||
|
h.SetBucketCount(0.5, 50)
|
||||||
|
h.SetBucketCount(1.0, 950.5)
|
||||||
|
h.SetBucketCount(math.Inf(1), 1000)
|
||||||
|
return &h
|
||||||
|
},
|
||||||
|
expectedFH: &histogram.FloatHistogram{
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
Count: 1000,
|
||||||
|
Sum: 1000.25,
|
||||||
|
PositiveSpans: []histogram.Span{{Length: 3}},
|
||||||
|
PositiveBuckets: []float64{50, 900.5, 49.5},
|
||||||
|
CustomValues: []float64{0.5, 1.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"non cumulative bucket": {
|
||||||
|
setup: func() *TempHistogram {
|
||||||
|
h := NewTempHistogram()
|
||||||
|
h.SetCount(1000)
|
||||||
|
h.SetSum(1000.25)
|
||||||
|
h.SetBucketCount(0.5, 50)
|
||||||
|
h.SetBucketCount(1.0, 950)
|
||||||
|
h.SetBucketCount(math.Inf(1), 900)
|
||||||
|
return &h
|
||||||
|
},
|
||||||
|
expectedErr: errCountNotCumulative,
|
||||||
|
},
|
||||||
|
"negative count": {
|
||||||
|
setup: func() *TempHistogram {
|
||||||
|
h := NewTempHistogram()
|
||||||
|
h.SetCount(-1000)
|
||||||
|
h.SetSum(1000.25)
|
||||||
|
h.SetBucketCount(0.5, 50)
|
||||||
|
h.SetBucketCount(1.0, 950)
|
||||||
|
h.SetBucketCount(math.Inf(1), 900)
|
||||||
|
return &h
|
||||||
|
},
|
||||||
|
expectedErr: errNegativeCount,
|
||||||
|
},
|
||||||
|
"mixed order": {
|
||||||
|
setup: func() *TempHistogram {
|
||||||
|
h := NewTempHistogram()
|
||||||
|
h.SetBucketCount(0.5, 50)
|
||||||
|
h.SetBucketCount(math.Inf(1), 1000)
|
||||||
|
h.SetBucketCount(1.0, 950)
|
||||||
|
h.SetCount(1000)
|
||||||
|
h.SetSum(1000.25)
|
||||||
|
return &h
|
||||||
|
},
|
||||||
|
expectedH: &histogram.Histogram{
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
Count: 1000,
|
||||||
|
Sum: 1000.25,
|
||||||
|
PositiveSpans: []histogram.Span{{Length: 3}},
|
||||||
|
PositiveBuckets: []int64{50, 850, -850},
|
||||||
|
CustomValues: []float64{0.5, 1.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, test := range tests {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
th := test.setup()
|
||||||
|
h, fh, err := th.Convert()
|
||||||
|
if test.expectedErr != nil {
|
||||||
|
require.ErrorIs(t, err, test.expectedErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
require.Equal(t, test.expectedH, h)
|
||||||
|
if h != nil {
|
||||||
|
require.NoError(t, h.Validate())
|
||||||
|
}
|
||||||
|
require.Equal(t, test.expectedFH, fh)
|
||||||
|
if fh != nil {
|
||||||
|
require.NoError(t, fh.Validate())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue