don't use cache for nhcb maps

Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
This commit is contained in:
Jeanette Tan 2024-07-03 17:56:48 +08:00 committed by György Krajcsovits
parent 0a321fe4d8
commit 02d5abf60e

View file

@ -887,9 +887,6 @@ type scrapeCache struct {
metadata map[string]*metaEntry
metrics *scrapeMetrics
nhcbLabels map[uint64]labels.Labels
nhcbBuilder map[uint64]convertnhcb.TempHistogram
}
// metaEntry holds meta information about a metric.
@ -913,8 +910,6 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache {
seriesPrev: map[uint64]labels.Labels{},
metadata: map[string]*metaEntry{},
metrics: metrics,
nhcbLabels: map[uint64]labels.Labels{},
nhcbBuilder: map[uint64]convertnhcb.TempHistogram{},
}
}
@ -1118,11 +1113,6 @@ func (c *scrapeCache) LengthMetadata() int {
return len(c.metadata)
}
func (c *scrapeCache) resetNhcb() {
c.nhcbLabels = map[uint64]labels.Labels{}
c.nhcbBuilder = map[uint64]convertnhcb.TempHistogram{}
}
func newScrapeLoop(ctx context.Context,
sc scraper,
l log.Logger,
@ -1500,6 +1490,8 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
e exemplar.Exemplar // escapes to heap so hoisted out of loop
meta metadata.Metadata
metadataChanged bool
nhcbLabels map[uint64]labels.Labels
nhcbBuilder map[uint64]convertnhcb.TempHistogram
)
exemplars := make([]exemplar.Exemplar, 1)
@ -1529,6 +1521,11 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
// Take an appender with limits.
app = appender(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
if sl.convertClassicHistograms {
nhcbLabels = make(map[uint64]labels.Labels)
nhcbBuilder = make(map[uint64]convertnhcb.TempHistogram)
}
defer func() {
if err != nil {
return
@ -1666,16 +1663,16 @@ loop:
case strings.HasSuffix(mName, "_bucket") && lset.Has(labels.BucketLabel):
le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64)
if err == nil && !math.IsNaN(le) {
processClassicHistogramSeries(lset, "_bucket", sl.cache, func(hist *convertnhcb.TempHistogram) {
processClassicHistogramSeries(lset, "_bucket", nhcbLabels, nhcbBuilder, func(hist *convertnhcb.TempHistogram) {
hist.BucketCounts[le] = val
})
}
case strings.HasSuffix(mName, "_count"):
processClassicHistogramSeries(lset, "_count", sl.cache, func(hist *convertnhcb.TempHistogram) {
processClassicHistogramSeries(lset, "_count", nhcbLabels, nhcbBuilder, func(hist *convertnhcb.TempHistogram) {
hist.Count = val
})
case strings.HasSuffix(mName, "_sum"):
processClassicHistogramSeries(lset, "_sum", sl.cache, func(hist *convertnhcb.TempHistogram) {
processClassicHistogramSeries(lset, "_sum", nhcbLabels, nhcbBuilder, func(hist *convertnhcb.TempHistogram) {
hist.Sum = val
})
}
@ -1802,52 +1799,48 @@ loop:
})
}
if sl.convertClassicHistograms {
for hash, th := range sl.cache.nhcbBuilder {
lset, ok := sl.cache.nhcbLabels[hash]
if !ok {
for hash, th := range nhcbBuilder {
lset, ok := nhcbLabels[hash]
if !ok {
continue
}
ub := make([]float64, 0, len(th.BucketCounts))
for b := range th.BucketCounts {
ub = append(ub, b)
}
upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub, false)
fhBase := hBase.ToFloat(nil)
h, fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, hBase, fhBase)
if h != nil {
if err := h.Validate(); err != nil {
continue
}
ub := make([]float64, 0, len(th.BucketCounts))
for b := range th.BucketCounts {
ub = append(ub, b)
if _, err = app.AppendHistogram(0, lset, defTime, h, nil); err != nil {
continue
}
upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub, false)
fhBase := hBase.ToFloat(nil)
h, fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, hBase, fhBase)
// fmt.Printf("FINAL lset: %s, timestamp: %v, val: %v\n", lset, defTime, fh)
if h != nil {
if err := h.Validate(); err != nil {
continue
}
if _, err = app.AppendHistogram(0, lset, defTime, h, nil); err != nil {
continue
}
} else if fh != nil {
if err := fh.Validate(); err != nil {
continue
}
if _, err = app.AppendHistogram(0, lset, defTime, nil, fh); err != nil {
continue
}
} else if fh != nil {
if err := fh.Validate(); err != nil {
continue
}
if _, err = app.AppendHistogram(0, lset, defTime, nil, fh); err != nil {
continue
}
}
sl.cache.resetNhcb()
}
return
}
func processClassicHistogramSeries(lset labels.Labels, suffix string, cache *scrapeCache, updateHist func(*convertnhcb.TempHistogram)) {
func processClassicHistogramSeries(lset labels.Labels, suffix string, nhcbLabels map[uint64]labels.Labels, nhcbBuilder map[uint64]convertnhcb.TempHistogram, updateHist func(*convertnhcb.TempHistogram)) {
m2 := convertnhcb.GetHistogramMetricBase(lset, suffix)
m2hash := m2.Hash()
cache.nhcbLabels[m2hash] = m2
th, exists := cache.nhcbBuilder[m2hash]
nhcbLabels[m2hash] = m2
th, exists := nhcbBuilder[m2hash]
if !exists {
th = convertnhcb.NewTempHistogram()
}
updateHist(&th)
cache.nhcbBuilder[m2hash] = th
nhcbBuilder[m2hash] = th
}
// Adds samples to the appender, checking the error, and then returns the # of samples added,