Fix not checking all labels before deciding to store NHCB

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2024-10-07 15:41:54 +02:00
parent 2a3aa500e9
commit 6bebeaf41b
2 changed files with 119 additions and 22 deletions

View file

@ -68,11 +68,11 @@ type NHCBParser struct {
// Remembers the last native histogram name so we can ignore // Remembers the last native histogram name so we can ignore
// conversions to NHCB when the name is the same. // conversions to NHCB when the name is the same.
lastNativeHistName string lastNativeHistLabels labels.Labels
// Remembers the last base histogram metric name (assuming it's // Remembers the last base histogram metric name (assuming it's
// a classic histogram) so we can tell if the next float series // a classic histogram) so we can tell if the next float series
// is part of the same classic histogram. // is part of the same classic histogram.
lastBaseHistName string lastBaseHistLabels labels.Labels
} }
func NewNHCBParser(p Parser, keepClassicHistograms bool) Parser { func NewNHCBParser(p Parser, keepClassicHistograms bool) Parser {
@ -132,7 +132,7 @@ func (p *NHCBParser) Next() (Entry, error) {
if p.justInsertedNHCB { if p.justInsertedNHCB {
p.justInsertedNHCB = false p.justInsertedNHCB = false
if p.entry == EntrySeries { if p.entry == EntrySeries {
if isNHCB := p.handleClassicHistogramSeries(p.lset); isNHCB && !p.keepClassicHistograms { if !p.keepClassicHistograms && p.handleClassicHistogramSeries(p.lset) {
return p.Next() return p.Next()
} }
} }
@ -151,12 +151,34 @@ func (p *NHCBParser) Next() (Entry, error) {
case EntrySeries: case EntrySeries:
p.bytes, p.ts, p.value = p.parser.Series() p.bytes, p.ts, p.value = p.parser.Series()
p.metricString = p.parser.Metric(&p.lset) p.metricString = p.parser.Metric(&p.lset)
histBaseName := convertnhcb.GetHistogramMetricBaseName(p.lset.Get(labels.MetricName)) // Check the label set to see if we can continue or need to emit the NHCB.
if histBaseName == p.lastNativeHistName { shouldInsertNHCB := false
break if len(p.lastBaseHistLabels) > 0 {
InnerCompare:
for _, l := range p.lset {
if l.Name == labels.MetricName {
baseName := convertnhcb.GetHistogramMetricBaseName(l.Value)
if baseName != p.lastBaseHistLabels.Get(labels.MetricName) {
p.storeBaseLabels()
shouldInsertNHCB = true
break InnerCompare
}
continue InnerCompare
}
if l.Name == labels.BucketLabel {
// Ignore.
continue InnerCompare
}
if l.Value != p.lastBaseHistLabels.Get(l.Name) {
// Different label value.
p.storeBaseLabels()
shouldInsertNHCB = true
break InnerCompare
}
}
} else {
p.storeBaseLabels()
} }
shouldInsertNHCB := p.lastBaseHistName != "" && p.lastBaseHistName != histBaseName
p.lastBaseHistName = histBaseName
if shouldInsertNHCB && p.processNHCB() { if shouldInsertNHCB && p.processNHCB() {
p.entry = et p.entry = et
return EntryHistogram, nil return EntryHistogram, nil
@ -168,7 +190,7 @@ func (p *NHCBParser) Next() (Entry, error) {
case EntryHistogram: case EntryHistogram:
p.bytes, p.ts, p.h, p.fh = p.parser.Histogram() p.bytes, p.ts, p.h, p.fh = p.parser.Histogram()
p.metricString = p.parser.Metric(&p.lset) p.metricString = p.parser.Metric(&p.lset)
p.lastNativeHistName = p.lset.Get(labels.MetricName) p.lastNativeHistLabels.CopyFrom(p.lset)
case EntryType: case EntryType:
p.bName, p.typ = p.parser.Type() p.bName, p.typ = p.parser.Type()
} }
@ -179,6 +201,23 @@ func (p *NHCBParser) Next() (Entry, error) {
return et, err return et, err
} }
// Save the label set of the classic histogram without suffix and bucket `le` label.
func (p *NHCBParser) storeBaseLabels() {
builder := labels.Builder{}
for _, l := range p.lset {
if l.Name == labels.MetricName {
builder.Set(l.Name, convertnhcb.GetHistogramMetricBaseName(l.Value))
continue
}
if l.Name == labels.BucketLabel {
// Ignore.
continue
}
builder.Set(l.Name, l.Value)
}
p.lastBaseHistLabels = builder.Labels()
}
// handleClassicHistogramSeries collates the classic histogram series to be converted to NHCB // handleClassicHistogramSeries collates the classic histogram series to be converted to NHCB
// if it is actually a classic histogram series (and not a normal float series) and if there // if it is actually a classic histogram series (and not a normal float series) and if there
// isn't already a native histogram with the same name (assuming it is always processed // isn't already a native histogram with the same name (assuming it is always processed
@ -188,6 +227,7 @@ func (p *NHCBParser) handleClassicHistogramSeries(lset labels.Labels) bool {
return false return false
} }
mName := lset.Get(labels.MetricName) mName := lset.Get(labels.MetricName)
// Sanity check to ensure that the TYPE metadata entry name is the same as the base name.
if convertnhcb.GetHistogramMetricBaseName(mName) != string(p.bName) { if convertnhcb.GetHistogramMetricBaseName(mName) != string(p.bName) {
return false return false
} }

View file

@ -352,19 +352,18 @@ foobar{quantile="0.99"} 150.1`
m: "something", m: "something",
typ: model.MetricTypeHistogram, typ: model.MetricTypeHistogram,
}, { }, {
// TODO(krajorama): do not miss the first histogram. m: `something{}`,
// m: `something{}`, shs: &histogram.Histogram{
// shs: &histogram.Histogram{ Schema: -53, // Custom buckets.
// Schema: -53, // Custom buckets. Count: 18,
// Count: 18, Sum: 324789.4,
// Sum: 324789.4, PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}},
// PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}}, PositiveBuckets: []int64{1, 16},
// PositiveBuckets: []int64{1, 16}, CustomValues: []float64{0.0}, // We do not store the +Inf boundary.
// CustomValues: []float64{0.0}, // We do not store the +Inf boundary. },
// }, lset: labels.FromStrings("__name__", "something"),
// lset: labels.FromStrings("__name__", "something"), // TODO(krajorama): ct: int64p(1520430001000),
// // TODO(krajorama): ct: int64p(1520430001000), }, {
// }, {
m: `something{a="b"}`, m: `something{a="b"}`,
shs: &histogram.Histogram{ shs: &histogram.Histogram{
Schema: -53, // Custom buckets. Schema: -53, // Custom buckets.
@ -443,3 +442,61 @@ foobar{quantile="0.99"} 150.1`
got := testParse(t, p) got := testParse(t, p)
requireEntries(t, exp, got) requireEntries(t, exp, got)
} }
func TestNhcbParserMultiHOnOpenMetricsParser(t *testing.T) {
// The input is taken originally from TestOpenMetricsParse, with additional tests for the NHCBParser.
input := `# HELP something Histogram with _created between buckets and summary
# TYPE something histogram
something_count 18
something_sum 324789.4
something_created 1520430001
something_bucket{le="0.0"} 1
something_bucket{le="+Inf"} 18
something_count{a="b"} 9
something_sum{a="b"} 42123.0
something_bucket{a="b",le="0.0"} 8
something_bucket{a="b",le="+Inf"} 9
something_created{a="b"} 1520430002
# EOF
`
exp := []parsedEntry{
{
m: "something",
help: "Histogram with _created between buckets and summary",
}, {
m: "something",
typ: model.MetricTypeHistogram,
}, {
m: `something{}`,
shs: &histogram.Histogram{
Schema: -53, // Custom buckets.
Count: 18,
Sum: 324789.4,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}},
PositiveBuckets: []int64{1, 16},
CustomValues: []float64{0.0}, // We do not store the +Inf boundary.
},
lset: labels.FromStrings("__name__", "something"),
// TODO(krajorama): ct: int64p(1520430001000),
}, {
m: `something{a="b"}`,
shs: &histogram.Histogram{
Schema: -53, // Custom buckets.
Count: 9,
Sum: 42123.0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}},
PositiveBuckets: []int64{8, -7},
CustomValues: []float64{0.0}, // We do not store the +Inf boundary.
},
lset: labels.FromStrings("__name__", "something", "a", "b"),
// TODO(krajorama): ct: int64p(1520430001000),
},
}
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
p = NewNHCBParser(p, false)
got := testParse(t, p)
requireEntries(t, exp, got)
}