From 0e5072b87385a322f97bdc9c1d993cc0082a1ee7 Mon Sep 17 00:00:00 2001 From: Jeanette Tan Date: Wed, 3 Jul 2024 17:56:48 +0800 Subject: [PATCH] keep only 1 nhcb in memory at at time Signed-off-by: Jeanette Tan --- model/textparse/nhcbparse.go | 87 ++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 49 deletions(-) diff --git a/model/textparse/nhcbparse.go b/model/textparse/nhcbparse.go index 1b595351d..165fd272f 100644 --- a/model/textparse/nhcbparse.go +++ b/model/textparse/nhcbparse.go @@ -43,8 +43,8 @@ type NhcbParser struct { buf []byte - nhcbLabels map[uint64]labels.Labels - nhcbBuilder map[uint64]convertnhcb.TempHistogram + lsetNhcb labels.Labels + tempNhcb convertnhcb.TempHistogram } func NewNhcbParser(p Parser, keepClassicHistograms bool) Parser { @@ -52,8 +52,7 @@ func NewNhcbParser(p Parser, keepClassicHistograms bool) Parser { parser: p, keepClassicHistograms: keepClassicHistograms, buf: make([]byte, 0, 1024), - nhcbLabels: make(map[uint64]labels.Labels), - nhcbBuilder: make(map[uint64]convertnhcb.TempHistogram), + tempNhcb: convertnhcb.NewTempHistogram(), } } @@ -97,8 +96,7 @@ func (p *NhcbParser) CreatedTimestamp() *int64 { func (p *NhcbParser) Next() (Entry, error) { et, err := p.parser.Next() if errors.Is(err, io.EOF) { - if len(p.nhcbBuilder) > 0 { - p.processNhcb() + if p.processNhcb(p.tempNhcb) { return EntryHistogram, nil } return EntryInvalid, err @@ -123,18 +121,18 @@ func (p *NhcbParser) handleClassicHistogramSeries(lset labels.Labels) bool { case strings.HasSuffix(mName, "_bucket") && lset.Has(labels.BucketLabel): le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64) if err == nil && !math.IsNaN(le) { - processClassicHistogramSeries(lset, "_bucket", p.nhcbLabels, p.nhcbBuilder, func(hist *convertnhcb.TempHistogram) { + p.processClassicHistogramSeries(lset, "_bucket", func(hist *convertnhcb.TempHistogram) { hist.BucketCounts[le] = p.value }) return true } case strings.HasSuffix(mName, "_count"): - processClassicHistogramSeries(lset, "_count", p.nhcbLabels, p.nhcbBuilder, func(hist *convertnhcb.TempHistogram) { + p.processClassicHistogramSeries(lset, "_count", func(hist *convertnhcb.TempHistogram) { hist.Count = p.value }) return true case strings.HasSuffix(mName, "_sum"): - processClassicHistogramSeries(lset, "_sum", p.nhcbLabels, p.nhcbBuilder, func(hist *convertnhcb.TempHistogram) { + p.processClassicHistogramSeries(lset, "_sum", func(hist *convertnhcb.TempHistogram) { hist.Sum = p.value }) return true @@ -142,47 +140,38 @@ func (p *NhcbParser) handleClassicHistogramSeries(lset labels.Labels) bool { return false } -func processClassicHistogramSeries(lset labels.Labels, suffix string, nhcbLabels map[uint64]labels.Labels, nhcbBuilder map[uint64]convertnhcb.TempHistogram, updateHist func(*convertnhcb.TempHistogram)) { - m2 := convertnhcb.GetHistogramMetricBase(lset, suffix) - m2hash := m2.Hash() - nhcbLabels[m2hash] = m2 - th, exists := nhcbBuilder[m2hash] - if !exists { - th = convertnhcb.NewTempHistogram() - } - updateHist(&th) - nhcbBuilder[m2hash] = th +func (p *NhcbParser) processClassicHistogramSeries(lset labels.Labels, suffix string, updateHist func(*convertnhcb.TempHistogram)) { + p.lsetNhcb = convertnhcb.GetHistogramMetricBase(lset, suffix) + updateHist(&p.tempNhcb) } -func (p *NhcbParser) processNhcb() { - for hash, th := range p.nhcbBuilder { - lset, ok := p.nhcbLabels[hash] - if !ok { - continue - } - ub := make([]float64, 0, len(th.BucketCounts)) - for b := range th.BucketCounts { - ub = append(ub, b) - } - upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub, false) - fhBase := hBase.ToFloat(nil) - h, fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, hBase, fhBase) - if h != nil { - if err := h.Validate(); err != nil { - panic("histogram validation failed") - } - p.h = h - p.fh = nil - } else if fh != nil { - if err := fh.Validate(); err != nil { - panic("histogram validation failed") - } - p.h = nil - p.fh = fh - } - p.bytes = lset.Bytes(p.buf) - p.lset = lset - p.metricString = lset.String() +func (p *NhcbParser) processNhcb(th convertnhcb.TempHistogram) bool { + if len(th.BucketCounts) == 0 { + return false } - p.nhcbBuilder = map[uint64]convertnhcb.TempHistogram{} + ub := make([]float64, 0, len(th.BucketCounts)) + for b := range th.BucketCounts { + ub = append(ub, b) + } + upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub, false) + fhBase := hBase.ToFloat(nil) + h, fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, hBase, fhBase) + if h != nil { + if err := h.Validate(); err != nil { + return false + } + p.h = h + p.fh = nil + } else if fh != nil { + if err := fh.Validate(); err != nil { + return false + } + p.h = nil + p.fh = fh + } + p.bytes = p.lsetNhcb.Bytes(p.buf) + p.lset = p.lsetNhcb + p.metricString = p.lsetNhcb.String() + p.tempNhcb = convertnhcb.NewTempHistogram() + return true }