mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
implement basic conversion of classic to nhcb in scrape
Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
This commit is contained in:
parent
3d54bcc018
commit
62e7f0438d
|
@ -617,6 +617,8 @@ type ScrapeConfig struct {
|
||||||
ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"`
|
ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"`
|
||||||
// Whether to scrape a classic histogram that is also exposed as a native histogram.
|
// Whether to scrape a classic histogram that is also exposed as a native histogram.
|
||||||
ScrapeClassicHistograms bool `yaml:"scrape_classic_histograms,omitempty"`
|
ScrapeClassicHistograms bool `yaml:"scrape_classic_histograms,omitempty"`
|
||||||
|
// Whether to convert a scraped classic histogram into a native histogram with custom buckets.
|
||||||
|
ConvertClassicHistograms bool `yaml:"convert_classic_histograms,omitempty"`
|
||||||
// The HTTP resource path on which to fetch metrics from targets.
|
// The HTTP resource path on which to fetch metrics from targets.
|
||||||
MetricsPath string `yaml:"metrics_path,omitempty"`
|
MetricsPath string `yaml:"metrics_path,omitempty"`
|
||||||
// The URL scheme with which to fetch metrics from targets.
|
// The URL scheme with which to fetch metrics from targets.
|
||||||
|
|
|
@ -39,6 +39,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/promql/parser/posrange"
|
"github.com/prometheus/prometheus/promql/parser/posrange"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/util/almost"
|
"github.com/prometheus/prometheus/util/almost"
|
||||||
|
"github.com/prometheus/prometheus/util/convertnhcb"
|
||||||
"github.com/prometheus/prometheus/util/teststorage"
|
"github.com/prometheus/prometheus/util/teststorage"
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
@ -460,43 +461,22 @@ func (cmd *loadCmd) append(a storage.Appender) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getHistogramMetricBase(m labels.Labels, suffix string) (labels.Labels, uint64) {
|
|
||||||
mName := m.Get(labels.MetricName)
|
|
||||||
baseM := labels.NewBuilder(m).
|
|
||||||
Set(labels.MetricName, strings.TrimSuffix(mName, suffix)).
|
|
||||||
Del(labels.BucketLabel).
|
|
||||||
Labels()
|
|
||||||
hash := baseM.Hash()
|
|
||||||
return baseM, hash
|
|
||||||
}
|
|
||||||
|
|
||||||
type tempHistogramWrapper struct {
|
type tempHistogramWrapper struct {
|
||||||
metric labels.Labels
|
metric labels.Labels
|
||||||
upperBounds []float64
|
upperBounds []float64
|
||||||
histogramByTs map[int64]tempHistogram
|
histogramByTs map[int64]convertnhcb.TempHistogram
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTempHistogramWrapper() tempHistogramWrapper {
|
func newTempHistogramWrapper() tempHistogramWrapper {
|
||||||
return tempHistogramWrapper{
|
return tempHistogramWrapper{
|
||||||
upperBounds: []float64{},
|
upperBounds: []float64{},
|
||||||
histogramByTs: map[int64]tempHistogram{},
|
histogramByTs: map[int64]convertnhcb.TempHistogram{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type tempHistogram struct {
|
func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap map[uint64]tempHistogramWrapper, smpls []promql.Sample, updateHistogramWrapper func(*tempHistogramWrapper), updateHistogram func(*convertnhcb.TempHistogram, float64)) {
|
||||||
bucketCounts map[float64]float64
|
m2 := convertnhcb.GetHistogramMetricBase(m, suffix)
|
||||||
count float64
|
m2hash := m2.Hash()
|
||||||
sum float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTempHistogram() tempHistogram {
|
|
||||||
return tempHistogram{
|
|
||||||
bucketCounts: map[float64]float64{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap map[uint64]tempHistogramWrapper, smpls []promql.Sample, updateHistogramWrapper func(*tempHistogramWrapper), updateHistogram func(*tempHistogram, float64)) {
|
|
||||||
m2, m2hash := getHistogramMetricBase(m, suffix)
|
|
||||||
histogramWrapper, exists := histogramMap[m2hash]
|
histogramWrapper, exists := histogramMap[m2hash]
|
||||||
if !exists {
|
if !exists {
|
||||||
histogramWrapper = newTempHistogramWrapper()
|
histogramWrapper = newTempHistogramWrapper()
|
||||||
|
@ -511,7 +491,7 @@ func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap
|
||||||
}
|
}
|
||||||
histogram, exists := histogramWrapper.histogramByTs[s.T]
|
histogram, exists := histogramWrapper.histogramByTs[s.T]
|
||||||
if !exists {
|
if !exists {
|
||||||
histogram = newTempHistogram()
|
histogram = convertnhcb.NewTempHistogram()
|
||||||
}
|
}
|
||||||
updateHistogram(&histogram, s.F)
|
updateHistogram(&histogram, s.F)
|
||||||
histogramWrapper.histogramByTs[s.T] = histogram
|
histogramWrapper.histogramByTs[s.T] = histogram
|
||||||
|
@ -519,34 +499,6 @@ func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap
|
||||||
histogramMap[m2hash] = histogramWrapper
|
histogramMap[m2hash] = histogramWrapper
|
||||||
}
|
}
|
||||||
|
|
||||||
func processUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64) ([]float64, *histogram.FloatHistogram) {
|
|
||||||
sort.Float64s(upperBounds0)
|
|
||||||
upperBounds := make([]float64, 0, len(upperBounds0))
|
|
||||||
prevLE := math.Inf(-1)
|
|
||||||
for _, le := range upperBounds0 {
|
|
||||||
if le != prevLE { // deduplicate
|
|
||||||
upperBounds = append(upperBounds, le)
|
|
||||||
prevLE = le
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var customBounds []float64
|
|
||||||
if upperBounds[len(upperBounds)-1] == math.Inf(1) {
|
|
||||||
customBounds = upperBounds[:len(upperBounds)-1]
|
|
||||||
} else {
|
|
||||||
customBounds = upperBounds
|
|
||||||
}
|
|
||||||
return upperBounds, &histogram.FloatHistogram{
|
|
||||||
Count: 0,
|
|
||||||
Sum: 0,
|
|
||||||
Schema: histogram.CustomBucketsSchema,
|
|
||||||
PositiveSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: uint32(len(upperBounds))},
|
|
||||||
},
|
|
||||||
PositiveBuckets: make([]float64, len(upperBounds)),
|
|
||||||
CustomValues: customBounds,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If classic histograms are defined, convert them into native histograms with custom
|
// If classic histograms are defined, convert them into native histograms with custom
|
||||||
// bounds and append the defined time series to the storage.
|
// bounds and append the defined time series to the storage.
|
||||||
func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
|
func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
|
||||||
|
@ -565,16 +517,16 @@ func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
|
||||||
}
|
}
|
||||||
processClassicHistogramSeries(m, "_bucket", histogramMap, smpls, func(histogramWrapper *tempHistogramWrapper) {
|
processClassicHistogramSeries(m, "_bucket", histogramMap, smpls, func(histogramWrapper *tempHistogramWrapper) {
|
||||||
histogramWrapper.upperBounds = append(histogramWrapper.upperBounds, le)
|
histogramWrapper.upperBounds = append(histogramWrapper.upperBounds, le)
|
||||||
}, func(histogram *tempHistogram, f float64) {
|
}, func(histogram *convertnhcb.TempHistogram, f float64) {
|
||||||
histogram.bucketCounts[le] = f
|
histogram.BucketCounts[le] = f
|
||||||
})
|
})
|
||||||
case strings.HasSuffix(mName, "_count"):
|
case strings.HasSuffix(mName, "_count"):
|
||||||
processClassicHistogramSeries(m, "_count", histogramMap, smpls, nil, func(histogram *tempHistogram, f float64) {
|
processClassicHistogramSeries(m, "_count", histogramMap, smpls, nil, func(histogram *convertnhcb.TempHistogram, f float64) {
|
||||||
histogram.count = f
|
histogram.Count = f
|
||||||
})
|
})
|
||||||
case strings.HasSuffix(mName, "_sum"):
|
case strings.HasSuffix(mName, "_sum"):
|
||||||
processClassicHistogramSeries(m, "_sum", histogramMap, smpls, nil, func(histogram *tempHistogram, f float64) {
|
processClassicHistogramSeries(m, "_sum", histogramMap, smpls, nil, func(histogram *convertnhcb.TempHistogram, f float64) {
|
||||||
histogram.sum = f
|
histogram.Sum = f
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -582,30 +534,14 @@ func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
|
||||||
// Convert the collated classic histogram data into native histograms
|
// Convert the collated classic histogram data into native histograms
|
||||||
// with custom bounds and append them to the storage.
|
// with custom bounds and append them to the storage.
|
||||||
for _, histogramWrapper := range histogramMap {
|
for _, histogramWrapper := range histogramMap {
|
||||||
upperBounds, fhBase := processUpperBoundsAndCreateBaseHistogram(histogramWrapper.upperBounds)
|
upperBounds, fhBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(histogramWrapper.upperBounds)
|
||||||
samples := make([]promql.Sample, 0, len(histogramWrapper.histogramByTs))
|
samples := make([]promql.Sample, 0, len(histogramWrapper.histogramByTs))
|
||||||
for t, histogram := range histogramWrapper.histogramByTs {
|
for t, histogram := range histogramWrapper.histogramByTs {
|
||||||
fh := fhBase.Copy()
|
fh := convertnhcb.ConvertHistogramWrapper(histogram, upperBounds, fhBase)
|
||||||
var prevCount, total float64
|
if err := fh.Validate(); err != nil {
|
||||||
for i, le := range upperBounds {
|
|
||||||
currCount, exists := histogram.bucketCounts[le]
|
|
||||||
if !exists {
|
|
||||||
currCount = 0
|
|
||||||
}
|
|
||||||
count := currCount - prevCount
|
|
||||||
fh.PositiveBuckets[i] = count
|
|
||||||
total += count
|
|
||||||
prevCount = currCount
|
|
||||||
}
|
|
||||||
fh.Sum = histogram.sum
|
|
||||||
if histogram.count != 0 {
|
|
||||||
total = histogram.count
|
|
||||||
}
|
|
||||||
fh.Count = total
|
|
||||||
s := promql.Sample{T: t, H: fh.Compact(0)}
|
|
||||||
if err := s.H.Validate(); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
s := promql.Sample{T: t, H: fh}
|
||||||
samples = append(samples, s)
|
samples = append(samples, s)
|
||||||
}
|
}
|
||||||
sort.Slice(samples, func(i, j int) bool { return samples[i].T < samples[j].T })
|
sort.Slice(samples, func(i, j int) bool { return samples[i].T < samples[j].T })
|
||||||
|
|
|
@ -47,6 +47,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/model/timestamp"
|
"github.com/prometheus/prometheus/model/timestamp"
|
||||||
"github.com/prometheus/prometheus/model/value"
|
"github.com/prometheus/prometheus/model/value"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/util/convertnhcb"
|
||||||
"github.com/prometheus/prometheus/util/pool"
|
"github.com/prometheus/prometheus/util/pool"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -111,6 +112,7 @@ type scrapeLoopOptions struct {
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
scrapeClassicHistograms bool
|
scrapeClassicHistograms bool
|
||||||
|
convertClassicHistograms bool
|
||||||
|
|
||||||
mrc []*relabel.Config
|
mrc []*relabel.Config
|
||||||
cache *scrapeCache
|
cache *scrapeCache
|
||||||
|
@ -178,6 +180,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
|
||||||
opts.interval,
|
opts.interval,
|
||||||
opts.timeout,
|
opts.timeout,
|
||||||
opts.scrapeClassicHistograms,
|
opts.scrapeClassicHistograms,
|
||||||
|
opts.convertClassicHistograms,
|
||||||
options.EnableNativeHistogramsIngestion,
|
options.EnableNativeHistogramsIngestion,
|
||||||
options.EnableCreatedTimestampZeroIngestion,
|
options.EnableCreatedTimestampZeroIngestion,
|
||||||
options.ExtraMetrics,
|
options.ExtraMetrics,
|
||||||
|
@ -440,6 +443,7 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||||
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
|
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
|
||||||
mrc = sp.config.MetricRelabelConfigs
|
mrc = sp.config.MetricRelabelConfigs
|
||||||
scrapeClassicHistograms = sp.config.ScrapeClassicHistograms
|
scrapeClassicHistograms = sp.config.ScrapeClassicHistograms
|
||||||
|
convertClassicHistograms = sp.config.ConvertClassicHistograms
|
||||||
)
|
)
|
||||||
|
|
||||||
sp.targetMtx.Lock()
|
sp.targetMtx.Lock()
|
||||||
|
@ -476,6 +480,7 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||||
interval: interval,
|
interval: interval,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
scrapeClassicHistograms: scrapeClassicHistograms,
|
scrapeClassicHistograms: scrapeClassicHistograms,
|
||||||
|
convertClassicHistograms: convertClassicHistograms,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.setForcedError(err)
|
l.setForcedError(err)
|
||||||
|
@ -828,6 +833,7 @@ type scrapeLoop struct {
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
scrapeClassicHistograms bool
|
scrapeClassicHistograms bool
|
||||||
|
convertClassicHistograms bool
|
||||||
|
|
||||||
// Feature flagged options.
|
// Feature flagged options.
|
||||||
enableNativeHistogramIngestion bool
|
enableNativeHistogramIngestion bool
|
||||||
|
@ -881,6 +887,9 @@ type scrapeCache struct {
|
||||||
metadata map[string]*metaEntry
|
metadata map[string]*metaEntry
|
||||||
|
|
||||||
metrics *scrapeMetrics
|
metrics *scrapeMetrics
|
||||||
|
|
||||||
|
nhcbLabels map[uint64]labels.Labels
|
||||||
|
nhcbBuilder map[uint64]convertnhcb.TempHistogram
|
||||||
}
|
}
|
||||||
|
|
||||||
// metaEntry holds meta information about a metric.
|
// metaEntry holds meta information about a metric.
|
||||||
|
@ -904,6 +913,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache {
|
||||||
seriesPrev: map[uint64]labels.Labels{},
|
seriesPrev: map[uint64]labels.Labels{},
|
||||||
metadata: map[string]*metaEntry{},
|
metadata: map[string]*metaEntry{},
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
|
nhcbLabels: map[uint64]labels.Labels{},
|
||||||
|
nhcbBuilder: map[uint64]convertnhcb.TempHistogram{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1107,6 +1118,11 @@ func (c *scrapeCache) LengthMetadata() int {
|
||||||
return len(c.metadata)
|
return len(c.metadata)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *scrapeCache) resetNhcb() {
|
||||||
|
c.nhcbLabels = map[uint64]labels.Labels{}
|
||||||
|
c.nhcbBuilder = map[uint64]convertnhcb.TempHistogram{}
|
||||||
|
}
|
||||||
|
|
||||||
func newScrapeLoop(ctx context.Context,
|
func newScrapeLoop(ctx context.Context,
|
||||||
sc scraper,
|
sc scraper,
|
||||||
l log.Logger,
|
l log.Logger,
|
||||||
|
@ -1127,6 +1143,7 @@ func newScrapeLoop(ctx context.Context,
|
||||||
interval time.Duration,
|
interval time.Duration,
|
||||||
timeout time.Duration,
|
timeout time.Duration,
|
||||||
scrapeClassicHistograms bool,
|
scrapeClassicHistograms bool,
|
||||||
|
convertClassicHistograms bool,
|
||||||
enableNativeHistogramIngestion bool,
|
enableNativeHistogramIngestion bool,
|
||||||
enableCTZeroIngestion bool,
|
enableCTZeroIngestion bool,
|
||||||
reportExtraMetrics bool,
|
reportExtraMetrics bool,
|
||||||
|
@ -1180,6 +1197,7 @@ func newScrapeLoop(ctx context.Context,
|
||||||
interval: interval,
|
interval: interval,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
scrapeClassicHistograms: scrapeClassicHistograms,
|
scrapeClassicHistograms: scrapeClassicHistograms,
|
||||||
|
convertClassicHistograms: convertClassicHistograms,
|
||||||
enableNativeHistogramIngestion: enableNativeHistogramIngestion,
|
enableNativeHistogramIngestion: enableNativeHistogramIngestion,
|
||||||
enableCTZeroIngestion: enableCTZeroIngestion,
|
enableCTZeroIngestion: enableCTZeroIngestion,
|
||||||
reportExtraMetrics: reportExtraMetrics,
|
reportExtraMetrics: reportExtraMetrics,
|
||||||
|
@ -1641,6 +1659,27 @@ loop:
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ref, err = app.Append(ref, lset, t, val)
|
ref, err = app.Append(ref, lset, t, val)
|
||||||
|
|
||||||
|
if sl.convertClassicHistograms {
|
||||||
|
mName := lset.Get(labels.MetricName)
|
||||||
|
switch {
|
||||||
|
case strings.HasSuffix(mName, "_bucket") && lset.Has(labels.BucketLabel):
|
||||||
|
le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64)
|
||||||
|
if err == nil && !math.IsNaN(le) {
|
||||||
|
processClassicHistogramSeries(lset, "_bucket", sl.cache, func(hist *convertnhcb.TempHistogram) {
|
||||||
|
hist.BucketCounts[le] = val
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case strings.HasSuffix(mName, "_count"):
|
||||||
|
processClassicHistogramSeries(lset, "_count", sl.cache, func(hist *convertnhcb.TempHistogram) {
|
||||||
|
hist.Count = val
|
||||||
|
})
|
||||||
|
case strings.HasSuffix(mName, "_sum"):
|
||||||
|
processClassicHistogramSeries(lset, "_sum", sl.cache, func(hist *convertnhcb.TempHistogram) {
|
||||||
|
hist.Sum = val
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1762,9 +1801,46 @@ loop:
|
||||||
return err == nil
|
return err == nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if sl.convertClassicHistograms {
|
||||||
|
for hash, th := range sl.cache.nhcbBuilder {
|
||||||
|
lset, ok := sl.cache.nhcbLabels[hash]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ub := make([]float64, 0, len(th.BucketCounts))
|
||||||
|
for b := range th.BucketCounts {
|
||||||
|
ub = append(ub, b)
|
||||||
|
}
|
||||||
|
upperBounds, fhBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub)
|
||||||
|
fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, fhBase)
|
||||||
|
if err := fh.Validate(); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// fmt.Printf("FINAL lset: %s, timestamp: %v, val: %v\n", lset, defTime, fh)
|
||||||
|
_, err = app.AppendHistogram(0, lset, defTime, nil, fh)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sl.cache.resetNhcb()
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func processClassicHistogramSeries(lset labels.Labels, suffix string, cache *scrapeCache, updateHist func(*convertnhcb.TempHistogram)) {
|
||||||
|
m2 := convertnhcb.GetHistogramMetricBase(lset, suffix)
|
||||||
|
m2hash := m2.Hash()
|
||||||
|
cache.nhcbLabels[m2hash] = m2
|
||||||
|
th, exists := cache.nhcbBuilder[m2hash]
|
||||||
|
if !exists {
|
||||||
|
th = convertnhcb.NewTempHistogram()
|
||||||
|
}
|
||||||
|
updateHist(&th)
|
||||||
|
cache.nhcbBuilder[m2hash] = th
|
||||||
|
}
|
||||||
|
|
||||||
// Adds samples to the appender, checking the error, and then returns the # of samples added,
|
// Adds samples to the appender, checking the error, and then returns the # of samples added,
|
||||||
// whether the caller should continue to process more samples, and any sample or bucket limit errors.
|
// whether the caller should continue to process more samples, and any sample or bucket limit errors.
|
||||||
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
|
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
|
||||||
|
|
|
@ -679,6 +679,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
|
false,
|
||||||
nil,
|
nil,
|
||||||
false,
|
false,
|
||||||
newTestScrapeMetrics(t),
|
newTestScrapeMetrics(t),
|
||||||
|
@ -821,6 +822,7 @@ func TestScrapeLoopRun(t *testing.T) {
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
|
false,
|
||||||
nil,
|
nil,
|
||||||
false,
|
false,
|
||||||
scrapeMetrics,
|
scrapeMetrics,
|
||||||
|
@ -965,6 +967,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
|
false,
|
||||||
nil,
|
nil,
|
||||||
false,
|
false,
|
||||||
scrapeMetrics,
|
scrapeMetrics,
|
||||||
|
@ -3366,6 +3369,106 @@ test_summary_count 199
|
||||||
checkValues("quantile", expectedQuantileValues, series)
|
checkValues("quantile", expectedQuantileValues, series)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Testing whether we can automatically convert scraped classic histograms into native histograms with custom buckets.
|
||||||
|
func TestConvertClassicHistograms(t *testing.T) {
|
||||||
|
simpleStorage := teststorage.New(t)
|
||||||
|
defer simpleStorage.Close()
|
||||||
|
|
||||||
|
config := &config.ScrapeConfig{
|
||||||
|
JobName: "test",
|
||||||
|
SampleLimit: 100,
|
||||||
|
Scheme: "http",
|
||||||
|
ScrapeInterval: model.Duration(100 * time.Millisecond),
|
||||||
|
ScrapeTimeout: model.Duration(100 * time.Millisecond),
|
||||||
|
ConvertClassicHistograms: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
metricsText := `
|
||||||
|
# HELP test_histogram This is a histogram with default buckets
|
||||||
|
# TYPE test_histogram histogram
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.005"} 0
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.01"} 0
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.025"} 0
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.05"} 0
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.1"} 0
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.25"} 0
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="0.5"} 0
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="1"} 0
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="2.5"} 0
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="5"} 0
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="10"} 1
|
||||||
|
test_histogram_bucket{address="0.0.0.0",port="5001",le="+Inf"} 1
|
||||||
|
test_histogram_sum{address="0.0.0.0",port="5001"} 10
|
||||||
|
test_histogram_count{address="0.0.0.0",port="5001"} 1
|
||||||
|
`
|
||||||
|
|
||||||
|
// The expected "le" values do not have the trailing ".0".
|
||||||
|
expectedLeValues := []string{"0.005", "0.01", "0.025", "0.05", "0.1", "0.25", "0.5", "1", "2.5", "5", "10", "+Inf"}
|
||||||
|
|
||||||
|
scrapeCount := 0
|
||||||
|
scraped := make(chan bool)
|
||||||
|
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
fmt.Fprint(w, metricsText)
|
||||||
|
scrapeCount++
|
||||||
|
if scrapeCount > 2 {
|
||||||
|
close(scraped)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer sp.stop()
|
||||||
|
|
||||||
|
testURL, err := url.Parse(ts.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
sp.Sync([]*targetgroup.Group{
|
||||||
|
{
|
||||||
|
Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(testURL.Host)}},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.Len(t, sp.ActiveTargets(), 1)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("target was not scraped")
|
||||||
|
case <-scraped:
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
q, err := simpleStorage.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
checkValues := func(labelName string, expectedValues []string, series storage.SeriesSet) {
|
||||||
|
foundLeValues := map[string]bool{}
|
||||||
|
|
||||||
|
for series.Next() {
|
||||||
|
s := series.At()
|
||||||
|
v := s.Labels().Get(labelName)
|
||||||
|
require.NotContains(t, foundLeValues, v, "duplicate label value found")
|
||||||
|
foundLeValues[v] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, len(expectedValues), len(foundLeValues), "number of label values not as expected")
|
||||||
|
for _, v := range expectedValues {
|
||||||
|
require.Contains(t, foundLeValues, v, "label value not found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test_histogram_bucket"))
|
||||||
|
checkValues("le", expectedLeValues, series)
|
||||||
|
|
||||||
|
series = q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test_histogram"))
|
||||||
|
count := 0
|
||||||
|
for series.Next() {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
require.Equal(t, 1, count, "number of series not as expected")
|
||||||
|
}
|
||||||
|
|
||||||
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *testing.T) {
|
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *testing.T) {
|
||||||
appender := &collectResultAppender{}
|
appender := &collectResultAppender{}
|
||||||
var (
|
var (
|
||||||
|
|
92
util/convertnhcb/convertnhcb.go
Normal file
92
util/convertnhcb/convertnhcb.go
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
// Copyright 2024 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package convertnhcb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TempHistogram struct {
|
||||||
|
BucketCounts map[float64]float64
|
||||||
|
Count float64
|
||||||
|
Sum float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTempHistogram() TempHistogram {
|
||||||
|
return TempHistogram{
|
||||||
|
BucketCounts: map[float64]float64{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ProcessUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64) ([]float64, *histogram.FloatHistogram) {
|
||||||
|
sort.Float64s(upperBounds0)
|
||||||
|
upperBounds := make([]float64, 0, len(upperBounds0))
|
||||||
|
prevLE := math.Inf(-1)
|
||||||
|
for _, le := range upperBounds0 {
|
||||||
|
if le != prevLE { // deduplicate
|
||||||
|
upperBounds = append(upperBounds, le)
|
||||||
|
prevLE = le
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var customBounds []float64
|
||||||
|
if upperBounds[len(upperBounds)-1] == math.Inf(1) {
|
||||||
|
customBounds = upperBounds[:len(upperBounds)-1]
|
||||||
|
} else {
|
||||||
|
customBounds = upperBounds
|
||||||
|
}
|
||||||
|
return upperBounds, &histogram.FloatHistogram{
|
||||||
|
Count: 0,
|
||||||
|
Sum: 0,
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: uint32(len(upperBounds))},
|
||||||
|
},
|
||||||
|
PositiveBuckets: make([]float64, len(upperBounds)),
|
||||||
|
CustomValues: customBounds,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ConvertHistogramWrapper(hist TempHistogram, upperBounds []float64, fhBase *histogram.FloatHistogram) *histogram.FloatHistogram {
|
||||||
|
fh := fhBase.Copy()
|
||||||
|
var prevCount, total float64
|
||||||
|
for i, le := range upperBounds {
|
||||||
|
currCount, exists := hist.BucketCounts[le]
|
||||||
|
if !exists {
|
||||||
|
currCount = 0
|
||||||
|
}
|
||||||
|
count := currCount - prevCount
|
||||||
|
fh.PositiveBuckets[i] = count
|
||||||
|
total += count
|
||||||
|
prevCount = currCount
|
||||||
|
}
|
||||||
|
fh.Sum = hist.Sum
|
||||||
|
if hist.Count != 0 {
|
||||||
|
total = hist.Count
|
||||||
|
}
|
||||||
|
fh.Count = total
|
||||||
|
return fh.Compact(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetHistogramMetricBase(m labels.Labels, suffix string) labels.Labels {
|
||||||
|
mName := m.Get(labels.MetricName)
|
||||||
|
return labels.NewBuilder(m).
|
||||||
|
Set(labels.MetricName, strings.TrimSuffix(mName, suffix)).
|
||||||
|
Del(labels.BucketLabel).
|
||||||
|
Labels()
|
||||||
|
}
|
Loading…
Reference in a new issue