feat(nhcb): implement created timestamp handling (#15198)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

Call through to the underlaying parser if we are not in a histogram
and the entry is a series or exponential native histogram. Otherwise store
and retrieve CT for NHCB.

* fix(omparser): losing exemplars when CT is parsed

Fixes: #15137
Ignore exemplars while peeking ahead during CT parsing.
Simplify state reset with defer().

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
George Krajcsovits 2024-10-24 07:38:58 +02:00 committed by GitHub
parent cccbe72514
commit 2182b83271
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 88 additions and 44 deletions

View file

@ -239,13 +239,13 @@ func testParse(t *testing.T, p Parser) (ret []parsedEntry) {
}
p.Metric(&got.lset)
for e := (exemplar.Exemplar{}); p.Exemplar(&e); {
got.es = append(got.es, e)
}
// Parser reuses int pointer.
if ct := p.CreatedTimestamp(); ct != nil {
got.ct = int64p(*ct)
}
for e := (exemplar.Exemplar{}); p.Exemplar(&e); {
got.es = append(got.es, e)
}
case EntryType:
m, got.typ = p.Type()
got.m = string(m)

View file

@ -84,6 +84,7 @@ type NHCBParser struct {
fhNHCB *histogram.FloatHistogram
lsetNHCB labels.Labels
exemplars []exemplar.Exemplar
ctNHCB *int64
metricStringNHCB string
// Collates values from the classic histogram series to build
@ -92,6 +93,7 @@ type NHCBParser struct {
tempNHCB convertnhcb.TempHistogram
tempExemplars []exemplar.Exemplar
tempExemplarCount int
tempCT *int64
// Remembers the last base histogram metric name (assuming it's
// a classic histogram) so we can tell if the next float series
@ -159,6 +161,16 @@ func (p *NHCBParser) Exemplar(ex *exemplar.Exemplar) bool {
}
func (p *NHCBParser) CreatedTimestamp() *int64 {
switch p.state {
case stateStart:
if p.entry == EntrySeries || p.entry == EntryHistogram {
return p.parser.CreatedTimestamp()
}
case stateCollecting:
return p.parser.CreatedTimestamp()
case stateEmitting:
return p.ctNHCB
}
return nil
}
@ -174,22 +186,20 @@ func (p *NHCBParser) Next() (Entry, error) {
}
return p.entry, p.err
}
et, err := p.parser.Next()
if err != nil {
if errors.Is(err, io.EOF) && p.processNHCB() {
p.entry = et
p.err = err
p.entry, p.err = p.parser.Next()
if p.err != nil {
if errors.Is(p.err, io.EOF) && p.processNHCB() {
return EntryHistogram, nil
}
return EntryInvalid, err
return EntryInvalid, p.err
}
switch et {
switch p.entry {
case EntrySeries:
p.bytes, p.ts, p.value = p.parser.Series()
p.metricString = p.parser.Metric(&p.lset)
// Check the label set to see if we can continue or need to emit the NHCB.
if p.compareLabels() && p.processNHCB() {
p.entry = et
return EntryHistogram, nil
}
isNHCB := p.handleClassicHistogramSeries(p.lset)
@ -197,7 +207,7 @@ func (p *NHCBParser) Next() (Entry, error) {
// Do not return the classic histogram series if it was converted to NHCB and we are not keeping classic histograms.
return p.Next()
}
return et, err
return p.entry, p.err
case EntryHistogram:
p.bytes, p.ts, p.h, p.fh = p.parser.Histogram()
p.metricString = p.parser.Metric(&p.lset)
@ -205,10 +215,9 @@ func (p *NHCBParser) Next() (Entry, error) {
p.bName, p.typ = p.parser.Type()
}
if p.processNHCB() {
p.entry = et
return EntryHistogram, nil
}
return et, err
return p.entry, p.err
}
// Return true if labels have changed and we should emit the NHCB.
@ -274,8 +283,9 @@ func (p *NHCBParser) handleClassicHistogramSeries(lset labels.Labels) bool {
func (p *NHCBParser) processClassicHistogramSeries(lset labels.Labels, suffix string, updateHist func(*convertnhcb.TempHistogram)) {
if p.state != stateCollecting {
p.storeBaseLabels()
p.tempCT = p.parser.CreatedTimestamp()
p.state = stateCollecting
}
p.state = stateCollecting
p.tempLsetNHCB = convertnhcb.GetHistogramMetricBase(lset, suffix)
p.storeExemplars()
updateHist(&p.tempNHCB)
@ -337,7 +347,9 @@ func (p *NHCBParser) processNHCB() bool {
p.bytesNHCB = []byte(p.metricStringNHCB)
p.lsetNHCB = p.tempLsetNHCB
p.swapExemplars()
p.ctNHCB = p.tempCT
p.tempNHCB = convertnhcb.NewTempHistogram()
p.state = stateEmitting
p.tempCT = nil
return true
}

View file

@ -292,14 +292,14 @@ foobar{quantile="0.99"} 150.1`
lset: labels.FromStrings("__name__", "foo_total"),
t: int64p(1520879607789),
es: []exemplar.Exemplar{{Labels: labels.FromStrings("id", "counter-test"), Value: 5}},
// TODO(krajorama): ct: int64p(1520872607123),
ct: int64p(1520872607123),
}, {
m: `foo_total{a="b"}`,
v: 17.0,
lset: labels.FromStrings("__name__", "foo_total", "a", "b"),
t: int64p(1520879607789),
es: []exemplar.Exemplar{{Labels: labels.FromStrings("id", "counter-test"), Value: 5}},
// TODO(krajorama): ct: int64p(1520872607123),
ct: int64p(1520872607123),
}, {
m: "bar",
help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far",
@ -310,22 +310,22 @@ foobar{quantile="0.99"} 150.1`
m: "bar_count",
v: 17.0,
lset: labels.FromStrings("__name__", "bar_count"),
// TODO(krajorama): ct: int64p(1520872608124),
ct: int64p(1520872608124),
}, {
m: "bar_sum",
v: 324789.3,
lset: labels.FromStrings("__name__", "bar_sum"),
// TODO(krajorama): ct: int64p(1520872608124),
ct: int64p(1520872608124),
}, {
m: `bar{quantile="0.95"}`,
v: 123.7,
lset: labels.FromStrings("__name__", "bar", "quantile", "0.95"),
// TODO(krajorama): ct: int64p(1520872608124),
ct: int64p(1520872608124),
}, {
m: `bar{quantile="0.99"}`,
v: 150.0,
lset: labels.FromStrings("__name__", "bar", "quantile", "0.99"),
// TODO(krajorama): ct: int64p(1520872608124),
ct: int64p(1520872608124),
}, {
m: "baz",
help: "Histogram with the same objective as above's summary",
@ -343,7 +343,7 @@ foobar{quantile="0.99"} 150.1`
CustomValues: []float64{0.0}, // We do not store the +Inf boundary.
},
lset: labels.FromStrings("__name__", "baz"),
// TODO(krajorama): ct: int64p(1520872609125),
ct: int64p(1520872609125),
}, {
m: "fizz_created",
help: "Gauge which shouldn't be parsed as CT",
@ -371,7 +371,7 @@ foobar{quantile="0.99"} 150.1`
CustomValues: []float64{0.0}, // We do not store the +Inf boundary.
},
lset: labels.FromStrings("__name__", "something"),
// TODO(krajorama): ct: int64p(1520430001000),
ct: int64p(1520430001000),
}, {
m: `something{a="b"}`,
shs: &histogram.Histogram{
@ -383,7 +383,7 @@ foobar{quantile="0.99"} 150.1`
CustomValues: []float64{0.0}, // We do not store the +Inf boundary.
},
lset: labels.FromStrings("__name__", "something", "a", "b"),
// TODO(krajorama): ct: int64p(1520430002000),
ct: int64p(1520430002000),
}, {
m: "yum",
help: "Summary with _created between sum and quantiles",
@ -394,22 +394,22 @@ foobar{quantile="0.99"} 150.1`
m: `yum_count`,
v: 20,
lset: labels.FromStrings("__name__", "yum_count"),
// TODO(krajorama): ct: int64p(1520430003000),
ct: int64p(1520430003000),
}, {
m: `yum_sum`,
v: 324789.5,
lset: labels.FromStrings("__name__", "yum_sum"),
// TODO(krajorama): ct: int64p(1520430003000),
ct: int64p(1520430003000),
}, {
m: `yum{quantile="0.95"}`,
v: 123.7,
lset: labels.FromStrings("__name__", "yum", "quantile", "0.95"),
// TODO(krajorama): ct: int64p(1520430003000),
ct: int64p(1520430003000),
}, {
m: `yum{quantile="0.99"}`,
v: 150.0,
lset: labels.FromStrings("__name__", "yum", "quantile", "0.99"),
// TODO(krajorama): ct: int64p(1520430003000),
ct: int64p(1520430003000),
}, {
m: "foobar",
help: "Summary with _created as the first line",
@ -420,22 +420,22 @@ foobar{quantile="0.99"} 150.1`
m: `foobar_count`,
v: 21,
lset: labels.FromStrings("__name__", "foobar_count"),
// TODO(krajorama): ct: int64p(1520430004000),
ct: int64p(1520430004000),
}, {
m: `foobar_sum`,
v: 324789.6,
lset: labels.FromStrings("__name__", "foobar_sum"),
// TODO(krajorama): ct: int64p(1520430004000),
ct: int64p(1520430004000),
}, {
m: `foobar{quantile="0.95"}`,
v: 123.8,
lset: labels.FromStrings("__name__", "foobar", "quantile", "0.95"),
// TODO(krajorama): ct: int64p(1520430004000),
ct: int64p(1520430004000),
}, {
m: `foobar{quantile="0.99"}`,
v: 150.1,
lset: labels.FromStrings("__name__", "foobar", "quantile", "0.99"),
// TODO(krajorama): ct: int64p(1520430004000),
ct: int64p(1520430004000),
}, {
m: "metric",
help: "foo\x00bar",
@ -555,42 +555,49 @@ func TestNHCBParserProtoBufParser_NoNHCBWhenExponential(t *testing.T) {
},
lset: labels.FromStrings("__name__", "test_histogram"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_count",
v: 175,
lset: labels.FromStrings("__name__", "test_histogram_count"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_sum",
v: 0.0008280461746287094,
lset: labels.FromStrings("__name__", "test_histogram_sum"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_bucket\xffle\xff-0.0004899999999999998",
v: 2,
lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0004899999999999998"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_bucket\xffle\xff-0.0003899999999999998",
v: 4,
lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0003899999999999998"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_bucket\xffle\xff-0.0002899999999999998",
v: 16,
lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0002899999999999998"),
t: int64p(1234568),
ct: int64p(1000),
},
{
m: "test_histogram_bucket\xffle\xff+Inf",
v: 175,
lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"),
t: int64p(1234568),
ct: int64p(1000),
},
{
// TODO(krajorama): optimize: this should not be here. In case there's
@ -609,6 +616,7 @@ func TestNHCBParserProtoBufParser_NoNHCBWhenExponential(t *testing.T) {
},
lset: labels.FromStrings("__name__", "test_histogram"),
t: int64p(1234568),
ct: int64p(1000),
},
}
got := testParse(t, p)
@ -621,6 +629,10 @@ help: "Test histogram with classic and exponential buckets."
type: HISTOGRAM
metric: <
histogram: <
created_timestamp: <
seconds: 1
nanos: 1
>
sample_count: 175
sample_sum: 0.0008280461746287094
bucket: <

View file

@ -102,6 +102,8 @@ type OpenMetricsParser struct {
// Created timestamp parsing state.
ct int64
ctHashSet uint64
// ignoreExemplar instructs the parser to not overwrite exemplars (to keep them while peeking ahead).
ignoreExemplar bool
// visitedMFName is the metric family name of the last visited metric when peeking ahead
// for _created series during the execution of the CreatedTimestamp method.
visitedMFName []byte
@ -296,6 +298,14 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
p.skipCTSeries = false
p.ignoreExemplar = true
savedStart := p.start
defer func() {
p.ignoreExemplar = false
p.start = savedStart
p.l = resetLexer
}()
for {
eType, err := p.Next()
if err != nil {
@ -303,12 +313,12 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
// This might result in partial scrape with wrong/missing CT, but only
// spec improvement would help.
// TODO: Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this.
p.resetCTParseValues(resetLexer)
p.resetCTParseValues()
return nil
}
if eType != EntrySeries {
// Assume we hit different family, no CT line found.
p.resetCTParseValues(resetLexer)
p.resetCTParseValues()
return nil
}
@ -322,14 +332,14 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
peekedHash := p.seriesHash(&buf, peekedName[:len(peekedName)-8])
if peekedHash != currHash {
// Found CT line for a different series, for our series no CT.
p.resetCTParseValues(resetLexer)
p.resetCTParseValues()
return nil
}
// All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds.
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps
ct := int64(p.val * 1000.0)
p.setCTParseValues(ct, currHash, currName, true, resetLexer)
p.setCTParseValues(ct, currHash, currName, true)
return &ct
}
}
@ -371,17 +381,15 @@ func (p *OpenMetricsParser) seriesHash(offsetsArr *[]byte, metricFamilyName []by
// setCTParseValues sets the parser to the state after CreatedTimestamp method was called and CT was found.
// This is useful to prevent re-parsing the same series again and early return the CT value.
func (p *OpenMetricsParser) setCTParseValues(ct int64, ctHashSet uint64, mfName []byte, skipCTSeries bool, resetLexer *openMetricsLexer) {
func (p *OpenMetricsParser) setCTParseValues(ct int64, ctHashSet uint64, mfName []byte, skipCTSeries bool) {
p.ct = ct
p.l = resetLexer
p.ctHashSet = ctHashSet
p.visitedMFName = mfName
p.skipCTSeries = skipCTSeries // Do we need to set it?
}
// resetCtParseValues resets the parser to the state before CreatedTimestamp method was called.
func (p *OpenMetricsParser) resetCTParseValues(resetLexer *openMetricsLexer) {
p.l = resetLexer
func (p *OpenMetricsParser) resetCTParseValues() {
p.ctHashSet = 0
p.skipCTSeries = true
}
@ -417,10 +425,12 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
p.start = p.l.i
p.offsets = p.offsets[:0]
p.eOffsets = p.eOffsets[:0]
p.exemplar = p.exemplar[:0]
p.exemplarVal = 0
p.hasExemplarTs = false
if !p.ignoreExemplar {
p.eOffsets = p.eOffsets[:0]
p.exemplar = p.exemplar[:0]
p.exemplarVal = 0
p.hasExemplarTs = false
}
switch t := p.nextToken(); t {
case tEOFWord:
@ -545,6 +555,16 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
func (p *OpenMetricsParser) parseComment() error {
var err error
if p.ignoreExemplar {
for t := p.nextToken(); t != tLinebreak; t = p.nextToken() {
if t == tEOF {
return errors.New("data does not end with # EOF")
}
}
return nil
}
// Parse the labels.
p.eOffsets, err = p.parseLVals(p.eOffsets, true)
if err != nil {