insert nhcb parser as intermediate layer

Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
This commit is contained in:
Jeanette Tan 2024-07-03 17:56:48 +08:00 committed by György Krajcsovits
parent f596f17024
commit 172d4f2405
2 changed files with 192 additions and 80 deletions

View file

@ -0,0 +1,188 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package textparse
import (
"errors"
"io"
"math"
"strconv"
"strings"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/util/convertnhcb"
)
type NhcbParser struct {
parser Parser
keepClassicHistograms bool
bytes []byte
ts *int64
value float64
h *histogram.Histogram
fh *histogram.FloatHistogram
lset labels.Labels
metricString string
buf []byte
nhcbLabels map[uint64]labels.Labels
nhcbBuilder map[uint64]convertnhcb.TempHistogram
}
func NewNhcbParser(p Parser, keepClassicHistograms bool) Parser {
return &NhcbParser{
parser: p,
keepClassicHistograms: keepClassicHistograms,
buf: make([]byte, 0, 1024),
nhcbLabels: make(map[uint64]labels.Labels),
nhcbBuilder: make(map[uint64]convertnhcb.TempHistogram),
}
}
func (p *NhcbParser) Series() ([]byte, *int64, float64) {
return p.bytes, p.ts, p.value
}
func (p *NhcbParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) {
return p.bytes, p.ts, p.h, p.fh
}
func (p *NhcbParser) Help() ([]byte, []byte) {
return p.parser.Help()
}
func (p *NhcbParser) Type() ([]byte, model.MetricType) {
return p.parser.Type()
}
func (p *NhcbParser) Unit() ([]byte, []byte) {
return p.parser.Unit()
}
func (p *NhcbParser) Comment() []byte {
return p.parser.Comment()
}
func (p *NhcbParser) Metric(l *labels.Labels) string {
*l = p.lset
return p.metricString
}
func (p *NhcbParser) Exemplar(ex *exemplar.Exemplar) bool {
return p.parser.Exemplar(ex)
}
func (p *NhcbParser) CreatedTimestamp() *int64 {
return p.parser.CreatedTimestamp()
}
func (p *NhcbParser) Next() (Entry, error) {
et, err := p.parser.Next()
if errors.Is(err, io.EOF) {
if len(p.nhcbBuilder) > 0 {
p.processNhcb()
return EntryHistogram, nil
}
return EntryInvalid, err
}
switch et {
case EntrySeries:
p.bytes, p.ts, p.value = p.parser.Series()
p.metricString = p.parser.Metric(&p.lset)
if isNhcb := p.handleClassicHistogramSeries(p.lset); isNhcb && !p.keepClassicHistograms {
return p.Next()
}
case EntryHistogram:
p.bytes, p.ts, p.h, p.fh = p.parser.Histogram()
p.metricString = p.parser.Metric(&p.lset)
}
return et, err
}
func (p *NhcbParser) handleClassicHistogramSeries(lset labels.Labels) bool {
mName := lset.Get(labels.MetricName)
switch {
case strings.HasSuffix(mName, "_bucket") && lset.Has(labels.BucketLabel):
le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64)
if err == nil && !math.IsNaN(le) {
processClassicHistogramSeries(lset, "_bucket", p.nhcbLabels, p.nhcbBuilder, func(hist *convertnhcb.TempHistogram) {
hist.BucketCounts[le] = p.value
})
return true
}
case strings.HasSuffix(mName, "_count"):
processClassicHistogramSeries(lset, "_count", p.nhcbLabels, p.nhcbBuilder, func(hist *convertnhcb.TempHistogram) {
hist.Count = p.value
})
return true
case strings.HasSuffix(mName, "_sum"):
processClassicHistogramSeries(lset, "_sum", p.nhcbLabels, p.nhcbBuilder, func(hist *convertnhcb.TempHistogram) {
hist.Sum = p.value
})
return true
}
return false
}
func processClassicHistogramSeries(lset labels.Labels, suffix string, nhcbLabels map[uint64]labels.Labels, nhcbBuilder map[uint64]convertnhcb.TempHistogram, updateHist func(*convertnhcb.TempHistogram)) {
m2 := convertnhcb.GetHistogramMetricBase(lset, suffix)
m2hash := m2.Hash()
nhcbLabels[m2hash] = m2
th, exists := nhcbBuilder[m2hash]
if !exists {
th = convertnhcb.NewTempHistogram()
}
updateHist(&th)
nhcbBuilder[m2hash] = th
}
func (p *NhcbParser) processNhcb() {
for hash, th := range p.nhcbBuilder {
lset, ok := p.nhcbLabels[hash]
if !ok {
continue
}
ub := make([]float64, 0, len(th.BucketCounts))
for b := range th.BucketCounts {
ub = append(ub, b)
}
upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub, false)
fhBase := hBase.ToFloat(nil)
h, fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, hBase, fhBase)
if h != nil {
if err := h.Validate(); err != nil {
panic("histogram validation failed")
}
p.h = h
p.fh = nil
} else if fh != nil {
if err := fh.Validate(); err != nil {
panic("histogram validation failed")
}
p.h = nil
p.fh = fh
}
p.bytes = lset.Bytes(p.buf)
p.lset = lset
p.metricString = lset.String()
}
p.nhcbBuilder = map[uint64]convertnhcb.TempHistogram{}
}

View file

@ -47,7 +47,6 @@ import (
"github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/convertnhcb"
"github.com/prometheus/prometheus/util/pool" "github.com/prometheus/prometheus/util/pool"
) )
@ -1473,6 +1472,9 @@ type appendErrors struct {
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.symbolTable) p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.symbolTable)
if sl.convertClassicHistograms {
p = textparse.NewNhcbParser(p, sl.scrapeClassicHistograms)
}
if err != nil { if err != nil {
level.Debug(sl.l).Log( level.Debug(sl.l).Log(
"msg", "Invalid content type on scrape, using prometheus parser as fallback.", "msg", "Invalid content type on scrape, using prometheus parser as fallback.",
@ -1490,8 +1492,6 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
e exemplar.Exemplar // escapes to heap so hoisted out of loop e exemplar.Exemplar // escapes to heap so hoisted out of loop
meta metadata.Metadata meta metadata.Metadata
metadataChanged bool metadataChanged bool
nhcbLabels map[uint64]labels.Labels
nhcbBuilder map[uint64]convertnhcb.TempHistogram
) )
exemplars := make([]exemplar.Exemplar, 1) exemplars := make([]exemplar.Exemplar, 1)
@ -1521,11 +1521,6 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
// Take an appender with limits. // Take an appender with limits.
app = appender(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema) app = appender(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
if sl.convertClassicHistograms {
nhcbLabels = make(map[uint64]labels.Labels)
nhcbBuilder = make(map[uint64]convertnhcb.TempHistogram)
}
defer func() { defer func() {
if err != nil { if err != nil {
return return
@ -1655,38 +1650,9 @@ loop:
ref, err = app.AppendHistogram(ref, lset, t, nil, fh) ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
} }
} else { } else {
var skipAppendFloat bool
if sl.convertClassicHistograms {
mName := lset.Get(labels.MetricName)
if !sl.scrapeClassicHistograms {
baseMetadata, _ := sl.cache.GetMetadata(convertnhcb.GetHistogramMetricBaseName(mName))
if baseMetadata.Type == model.MetricTypeHistogram {
skipAppendFloat = true
}
}
switch {
case strings.HasSuffix(mName, "_bucket") && lset.Has(labels.BucketLabel):
le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64)
if err == nil && !math.IsNaN(le) {
processClassicHistogramSeries(lset, "_bucket", nhcbLabels, nhcbBuilder, func(hist *convertnhcb.TempHistogram) {
hist.BucketCounts[le] = val
})
}
case strings.HasSuffix(mName, "_count"):
processClassicHistogramSeries(lset, "_count", nhcbLabels, nhcbBuilder, func(hist *convertnhcb.TempHistogram) {
hist.Count = val
})
case strings.HasSuffix(mName, "_sum"):
processClassicHistogramSeries(lset, "_sum", nhcbLabels, nhcbBuilder, func(hist *convertnhcb.TempHistogram) {
hist.Sum = val
})
}
}
if !skipAppendFloat {
ref, err = app.Append(ref, lset, t, val) ref, err = app.Append(ref, lset, t, val)
} }
} }
}
if err == nil { if err == nil {
if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil {
@ -1806,51 +1772,9 @@ loop:
return err == nil return err == nil
}) })
} }
for hash, th := range nhcbBuilder {
lset, ok := nhcbLabels[hash]
if !ok {
continue
}
ub := make([]float64, 0, len(th.BucketCounts))
for b := range th.BucketCounts {
ub = append(ub, b)
}
upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub, false)
fhBase := hBase.ToFloat(nil)
h, fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, hBase, fhBase)
if h != nil {
if err := h.Validate(); err != nil {
continue
}
if _, err = app.AppendHistogram(0, lset, defTime, h, nil); err != nil {
continue
}
} else if fh != nil {
if err := fh.Validate(); err != nil {
continue
}
if _, err = app.AppendHistogram(0, lset, defTime, nil, fh); err != nil {
continue
}
}
}
return return
} }
func processClassicHistogramSeries(lset labels.Labels, suffix string, nhcbLabels map[uint64]labels.Labels, nhcbBuilder map[uint64]convertnhcb.TempHistogram, updateHist func(*convertnhcb.TempHistogram)) {
m2 := convertnhcb.GetHistogramMetricBase(lset, suffix)
m2hash := m2.Hash()
nhcbLabels[m2hash] = m2
th, exists := nhcbBuilder[m2hash]
if !exists {
th = convertnhcb.NewTempHistogram()
}
updateHist(&th)
nhcbBuilder[m2hash] = th
}
// Adds samples to the appender, checking the error, and then returns the # of samples added, // Adds samples to the appender, checking the error, and then returns the # of samples added,
// whether the caller should continue to process more samples, and any sample or bucket limit errors. // whether the caller should continue to process more samples, and any sample or bucket limit errors.
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {