keep only 1 nhcb in memory at at time

Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
This commit is contained in:
Jeanette Tan 2024-07-03 17:56:48 +08:00 committed by György Krajcsovits
parent 172d4f2405
commit 0e5072b873

View file

@ -43,8 +43,8 @@ type NhcbParser struct {
buf []byte buf []byte
nhcbLabels map[uint64]labels.Labels lsetNhcb labels.Labels
nhcbBuilder map[uint64]convertnhcb.TempHistogram tempNhcb convertnhcb.TempHistogram
} }
func NewNhcbParser(p Parser, keepClassicHistograms bool) Parser { func NewNhcbParser(p Parser, keepClassicHistograms bool) Parser {
@ -52,8 +52,7 @@ func NewNhcbParser(p Parser, keepClassicHistograms bool) Parser {
parser: p, parser: p,
keepClassicHistograms: keepClassicHistograms, keepClassicHistograms: keepClassicHistograms,
buf: make([]byte, 0, 1024), buf: make([]byte, 0, 1024),
nhcbLabels: make(map[uint64]labels.Labels), tempNhcb: convertnhcb.NewTempHistogram(),
nhcbBuilder: make(map[uint64]convertnhcb.TempHistogram),
} }
} }
@ -97,8 +96,7 @@ func (p *NhcbParser) CreatedTimestamp() *int64 {
func (p *NhcbParser) Next() (Entry, error) { func (p *NhcbParser) Next() (Entry, error) {
et, err := p.parser.Next() et, err := p.parser.Next()
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
if len(p.nhcbBuilder) > 0 { if p.processNhcb(p.tempNhcb) {
p.processNhcb()
return EntryHistogram, nil return EntryHistogram, nil
} }
return EntryInvalid, err return EntryInvalid, err
@ -123,18 +121,18 @@ func (p *NhcbParser) handleClassicHistogramSeries(lset labels.Labels) bool {
case strings.HasSuffix(mName, "_bucket") && lset.Has(labels.BucketLabel): case strings.HasSuffix(mName, "_bucket") && lset.Has(labels.BucketLabel):
le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64) le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64)
if err == nil && !math.IsNaN(le) { if err == nil && !math.IsNaN(le) {
processClassicHistogramSeries(lset, "_bucket", p.nhcbLabels, p.nhcbBuilder, func(hist *convertnhcb.TempHistogram) { p.processClassicHistogramSeries(lset, "_bucket", func(hist *convertnhcb.TempHistogram) {
hist.BucketCounts[le] = p.value hist.BucketCounts[le] = p.value
}) })
return true return true
} }
case strings.HasSuffix(mName, "_count"): case strings.HasSuffix(mName, "_count"):
processClassicHistogramSeries(lset, "_count", p.nhcbLabels, p.nhcbBuilder, func(hist *convertnhcb.TempHistogram) { p.processClassicHistogramSeries(lset, "_count", func(hist *convertnhcb.TempHistogram) {
hist.Count = p.value hist.Count = p.value
}) })
return true return true
case strings.HasSuffix(mName, "_sum"): case strings.HasSuffix(mName, "_sum"):
processClassicHistogramSeries(lset, "_sum", p.nhcbLabels, p.nhcbBuilder, func(hist *convertnhcb.TempHistogram) { p.processClassicHistogramSeries(lset, "_sum", func(hist *convertnhcb.TempHistogram) {
hist.Sum = p.value hist.Sum = p.value
}) })
return true return true
@ -142,47 +140,38 @@ func (p *NhcbParser) handleClassicHistogramSeries(lset labels.Labels) bool {
return false return false
} }
func processClassicHistogramSeries(lset labels.Labels, suffix string, nhcbLabels map[uint64]labels.Labels, nhcbBuilder map[uint64]convertnhcb.TempHistogram, updateHist func(*convertnhcb.TempHistogram)) { func (p *NhcbParser) processClassicHistogramSeries(lset labels.Labels, suffix string, updateHist func(*convertnhcb.TempHistogram)) {
m2 := convertnhcb.GetHistogramMetricBase(lset, suffix) p.lsetNhcb = convertnhcb.GetHistogramMetricBase(lset, suffix)
m2hash := m2.Hash() updateHist(&p.tempNhcb)
nhcbLabels[m2hash] = m2
th, exists := nhcbBuilder[m2hash]
if !exists {
th = convertnhcb.NewTempHistogram()
}
updateHist(&th)
nhcbBuilder[m2hash] = th
} }
func (p *NhcbParser) processNhcb() { func (p *NhcbParser) processNhcb(th convertnhcb.TempHistogram) bool {
for hash, th := range p.nhcbBuilder { if len(th.BucketCounts) == 0 {
lset, ok := p.nhcbLabels[hash] return false
if !ok {
continue
}
ub := make([]float64, 0, len(th.BucketCounts))
for b := range th.BucketCounts {
ub = append(ub, b)
}
upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub, false)
fhBase := hBase.ToFloat(nil)
h, fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, hBase, fhBase)
if h != nil {
if err := h.Validate(); err != nil {
panic("histogram validation failed")
}
p.h = h
p.fh = nil
} else if fh != nil {
if err := fh.Validate(); err != nil {
panic("histogram validation failed")
}
p.h = nil
p.fh = fh
}
p.bytes = lset.Bytes(p.buf)
p.lset = lset
p.metricString = lset.String()
} }
p.nhcbBuilder = map[uint64]convertnhcb.TempHistogram{} ub := make([]float64, 0, len(th.BucketCounts))
for b := range th.BucketCounts {
ub = append(ub, b)
}
upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub, false)
fhBase := hBase.ToFloat(nil)
h, fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, hBase, fhBase)
if h != nil {
if err := h.Validate(); err != nil {
return false
}
p.h = h
p.fh = nil
} else if fh != nil {
if err := fh.Validate(); err != nil {
return false
}
p.h = nil
p.fh = fh
}
p.bytes = p.lsetNhcb.Bytes(p.buf)
p.lset = p.lsetNhcb
p.metricString = p.lsetNhcb.String()
p.tempNhcb = convertnhcb.NewTempHistogram()
return true
} }