From 2182b832711586f8d8a4c34f5820ea9265d818b6 Mon Sep 17 00:00:00 2001 From: George Krajcsovits Date: Thu, 24 Oct 2024 07:38:58 +0200 Subject: [PATCH] feat(nhcb): implement created timestamp handling (#15198) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Call through to the underlaying parser if we are not in a histogram and the entry is a series or exponential native histogram. Otherwise store and retrieve CT for NHCB. * fix(omparser): losing exemplars when CT is parsed Fixes: #15137 Ignore exemplars while peeking ahead during CT parsing. Simplify state reset with defer(). Signed-off-by: György Krajcsovits --- model/textparse/interface_test.go | 6 ++-- model/textparse/nhcbparse.go | 36 ++++++++++++++-------- model/textparse/nhcbparse_test.go | 46 ++++++++++++++++++----------- model/textparse/openmetricsparse.go | 44 +++++++++++++++++++-------- 4 files changed, 88 insertions(+), 44 deletions(-) diff --git a/model/textparse/interface_test.go b/model/textparse/interface_test.go index 6136fbc915..72c8284f2d 100644 --- a/model/textparse/interface_test.go +++ b/model/textparse/interface_test.go @@ -239,13 +239,13 @@ func testParse(t *testing.T, p Parser) (ret []parsedEntry) { } p.Metric(&got.lset) - for e := (exemplar.Exemplar{}); p.Exemplar(&e); { - got.es = append(got.es, e) - } // Parser reuses int pointer. if ct := p.CreatedTimestamp(); ct != nil { got.ct = int64p(*ct) } + for e := (exemplar.Exemplar{}); p.Exemplar(&e); { + got.es = append(got.es, e) + } case EntryType: m, got.typ = p.Type() got.m = string(m) diff --git a/model/textparse/nhcbparse.go b/model/textparse/nhcbparse.go index 22384f1ec8..eab9fa7e6e 100644 --- a/model/textparse/nhcbparse.go +++ b/model/textparse/nhcbparse.go @@ -84,6 +84,7 @@ type NHCBParser struct { fhNHCB *histogram.FloatHistogram lsetNHCB labels.Labels exemplars []exemplar.Exemplar + ctNHCB *int64 metricStringNHCB string // Collates values from the classic histogram series to build @@ -92,6 +93,7 @@ type NHCBParser struct { tempNHCB convertnhcb.TempHistogram tempExemplars []exemplar.Exemplar tempExemplarCount int + tempCT *int64 // Remembers the last base histogram metric name (assuming it's // a classic histogram) so we can tell if the next float series @@ -159,6 +161,16 @@ func (p *NHCBParser) Exemplar(ex *exemplar.Exemplar) bool { } func (p *NHCBParser) CreatedTimestamp() *int64 { + switch p.state { + case stateStart: + if p.entry == EntrySeries || p.entry == EntryHistogram { + return p.parser.CreatedTimestamp() + } + case stateCollecting: + return p.parser.CreatedTimestamp() + case stateEmitting: + return p.ctNHCB + } return nil } @@ -174,22 +186,20 @@ func (p *NHCBParser) Next() (Entry, error) { } return p.entry, p.err } - et, err := p.parser.Next() - if err != nil { - if errors.Is(err, io.EOF) && p.processNHCB() { - p.entry = et - p.err = err + + p.entry, p.err = p.parser.Next() + if p.err != nil { + if errors.Is(p.err, io.EOF) && p.processNHCB() { return EntryHistogram, nil } - return EntryInvalid, err + return EntryInvalid, p.err } - switch et { + switch p.entry { case EntrySeries: p.bytes, p.ts, p.value = p.parser.Series() p.metricString = p.parser.Metric(&p.lset) // Check the label set to see if we can continue or need to emit the NHCB. if p.compareLabels() && p.processNHCB() { - p.entry = et return EntryHistogram, nil } isNHCB := p.handleClassicHistogramSeries(p.lset) @@ -197,7 +207,7 @@ func (p *NHCBParser) Next() (Entry, error) { // Do not return the classic histogram series if it was converted to NHCB and we are not keeping classic histograms. return p.Next() } - return et, err + return p.entry, p.err case EntryHistogram: p.bytes, p.ts, p.h, p.fh = p.parser.Histogram() p.metricString = p.parser.Metric(&p.lset) @@ -205,10 +215,9 @@ func (p *NHCBParser) Next() (Entry, error) { p.bName, p.typ = p.parser.Type() } if p.processNHCB() { - p.entry = et return EntryHistogram, nil } - return et, err + return p.entry, p.err } // Return true if labels have changed and we should emit the NHCB. @@ -274,8 +283,9 @@ func (p *NHCBParser) handleClassicHistogramSeries(lset labels.Labels) bool { func (p *NHCBParser) processClassicHistogramSeries(lset labels.Labels, suffix string, updateHist func(*convertnhcb.TempHistogram)) { if p.state != stateCollecting { p.storeBaseLabels() + p.tempCT = p.parser.CreatedTimestamp() + p.state = stateCollecting } - p.state = stateCollecting p.tempLsetNHCB = convertnhcb.GetHistogramMetricBase(lset, suffix) p.storeExemplars() updateHist(&p.tempNHCB) @@ -337,7 +347,9 @@ func (p *NHCBParser) processNHCB() bool { p.bytesNHCB = []byte(p.metricStringNHCB) p.lsetNHCB = p.tempLsetNHCB p.swapExemplars() + p.ctNHCB = p.tempCT p.tempNHCB = convertnhcb.NewTempHistogram() p.state = stateEmitting + p.tempCT = nil return true } diff --git a/model/textparse/nhcbparse_test.go b/model/textparse/nhcbparse_test.go index 80b65fd225..1ead2e30e2 100644 --- a/model/textparse/nhcbparse_test.go +++ b/model/textparse/nhcbparse_test.go @@ -292,14 +292,14 @@ foobar{quantile="0.99"} 150.1` lset: labels.FromStrings("__name__", "foo_total"), t: int64p(1520879607789), es: []exemplar.Exemplar{{Labels: labels.FromStrings("id", "counter-test"), Value: 5}}, - // TODO(krajorama): ct: int64p(1520872607123), + ct: int64p(1520872607123), }, { m: `foo_total{a="b"}`, v: 17.0, lset: labels.FromStrings("__name__", "foo_total", "a", "b"), t: int64p(1520879607789), es: []exemplar.Exemplar{{Labels: labels.FromStrings("id", "counter-test"), Value: 5}}, - // TODO(krajorama): ct: int64p(1520872607123), + ct: int64p(1520872607123), }, { m: "bar", help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far", @@ -310,22 +310,22 @@ foobar{quantile="0.99"} 150.1` m: "bar_count", v: 17.0, lset: labels.FromStrings("__name__", "bar_count"), - // TODO(krajorama): ct: int64p(1520872608124), + ct: int64p(1520872608124), }, { m: "bar_sum", v: 324789.3, lset: labels.FromStrings("__name__", "bar_sum"), - // TODO(krajorama): ct: int64p(1520872608124), + ct: int64p(1520872608124), }, { m: `bar{quantile="0.95"}`, v: 123.7, lset: labels.FromStrings("__name__", "bar", "quantile", "0.95"), - // TODO(krajorama): ct: int64p(1520872608124), + ct: int64p(1520872608124), }, { m: `bar{quantile="0.99"}`, v: 150.0, lset: labels.FromStrings("__name__", "bar", "quantile", "0.99"), - // TODO(krajorama): ct: int64p(1520872608124), + ct: int64p(1520872608124), }, { m: "baz", help: "Histogram with the same objective as above's summary", @@ -343,7 +343,7 @@ foobar{quantile="0.99"} 150.1` CustomValues: []float64{0.0}, // We do not store the +Inf boundary. }, lset: labels.FromStrings("__name__", "baz"), - // TODO(krajorama): ct: int64p(1520872609125), + ct: int64p(1520872609125), }, { m: "fizz_created", help: "Gauge which shouldn't be parsed as CT", @@ -371,7 +371,7 @@ foobar{quantile="0.99"} 150.1` CustomValues: []float64{0.0}, // We do not store the +Inf boundary. }, lset: labels.FromStrings("__name__", "something"), - // TODO(krajorama): ct: int64p(1520430001000), + ct: int64p(1520430001000), }, { m: `something{a="b"}`, shs: &histogram.Histogram{ @@ -383,7 +383,7 @@ foobar{quantile="0.99"} 150.1` CustomValues: []float64{0.0}, // We do not store the +Inf boundary. }, lset: labels.FromStrings("__name__", "something", "a", "b"), - // TODO(krajorama): ct: int64p(1520430002000), + ct: int64p(1520430002000), }, { m: "yum", help: "Summary with _created between sum and quantiles", @@ -394,22 +394,22 @@ foobar{quantile="0.99"} 150.1` m: `yum_count`, v: 20, lset: labels.FromStrings("__name__", "yum_count"), - // TODO(krajorama): ct: int64p(1520430003000), + ct: int64p(1520430003000), }, { m: `yum_sum`, v: 324789.5, lset: labels.FromStrings("__name__", "yum_sum"), - // TODO(krajorama): ct: int64p(1520430003000), + ct: int64p(1520430003000), }, { m: `yum{quantile="0.95"}`, v: 123.7, lset: labels.FromStrings("__name__", "yum", "quantile", "0.95"), - // TODO(krajorama): ct: int64p(1520430003000), + ct: int64p(1520430003000), }, { m: `yum{quantile="0.99"}`, v: 150.0, lset: labels.FromStrings("__name__", "yum", "quantile", "0.99"), - // TODO(krajorama): ct: int64p(1520430003000), + ct: int64p(1520430003000), }, { m: "foobar", help: "Summary with _created as the first line", @@ -420,22 +420,22 @@ foobar{quantile="0.99"} 150.1` m: `foobar_count`, v: 21, lset: labels.FromStrings("__name__", "foobar_count"), - // TODO(krajorama): ct: int64p(1520430004000), + ct: int64p(1520430004000), }, { m: `foobar_sum`, v: 324789.6, lset: labels.FromStrings("__name__", "foobar_sum"), - // TODO(krajorama): ct: int64p(1520430004000), + ct: int64p(1520430004000), }, { m: `foobar{quantile="0.95"}`, v: 123.8, lset: labels.FromStrings("__name__", "foobar", "quantile", "0.95"), - // TODO(krajorama): ct: int64p(1520430004000), + ct: int64p(1520430004000), }, { m: `foobar{quantile="0.99"}`, v: 150.1, lset: labels.FromStrings("__name__", "foobar", "quantile", "0.99"), - // TODO(krajorama): ct: int64p(1520430004000), + ct: int64p(1520430004000), }, { m: "metric", help: "foo\x00bar", @@ -555,42 +555,49 @@ func TestNHCBParserProtoBufParser_NoNHCBWhenExponential(t *testing.T) { }, lset: labels.FromStrings("__name__", "test_histogram"), t: int64p(1234568), + ct: int64p(1000), }, { m: "test_histogram_count", v: 175, lset: labels.FromStrings("__name__", "test_histogram_count"), t: int64p(1234568), + ct: int64p(1000), }, { m: "test_histogram_sum", v: 0.0008280461746287094, lset: labels.FromStrings("__name__", "test_histogram_sum"), t: int64p(1234568), + ct: int64p(1000), }, { m: "test_histogram_bucket\xffle\xff-0.0004899999999999998", v: 2, lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0004899999999999998"), t: int64p(1234568), + ct: int64p(1000), }, { m: "test_histogram_bucket\xffle\xff-0.0003899999999999998", v: 4, lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0003899999999999998"), t: int64p(1234568), + ct: int64p(1000), }, { m: "test_histogram_bucket\xffle\xff-0.0002899999999999998", v: 16, lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0002899999999999998"), t: int64p(1234568), + ct: int64p(1000), }, { m: "test_histogram_bucket\xffle\xff+Inf", v: 175, lset: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"), t: int64p(1234568), + ct: int64p(1000), }, { // TODO(krajorama): optimize: this should not be here. In case there's @@ -609,6 +616,7 @@ func TestNHCBParserProtoBufParser_NoNHCBWhenExponential(t *testing.T) { }, lset: labels.FromStrings("__name__", "test_histogram"), t: int64p(1234568), + ct: int64p(1000), }, } got := testParse(t, p) @@ -621,6 +629,10 @@ help: "Test histogram with classic and exponential buckets." type: HISTOGRAM metric: < histogram: < + created_timestamp: < + seconds: 1 + nanos: 1 + > sample_count: 175 sample_sum: 0.0008280461746287094 bucket: < diff --git a/model/textparse/openmetricsparse.go b/model/textparse/openmetricsparse.go index 70c24d9ec6..3ae9c7ddfc 100644 --- a/model/textparse/openmetricsparse.go +++ b/model/textparse/openmetricsparse.go @@ -102,6 +102,8 @@ type OpenMetricsParser struct { // Created timestamp parsing state. ct int64 ctHashSet uint64 + // ignoreExemplar instructs the parser to not overwrite exemplars (to keep them while peeking ahead). + ignoreExemplar bool // visitedMFName is the metric family name of the last visited metric when peeking ahead // for _created series during the execution of the CreatedTimestamp method. visitedMFName []byte @@ -296,6 +298,14 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 { p.skipCTSeries = false + p.ignoreExemplar = true + savedStart := p.start + defer func() { + p.ignoreExemplar = false + p.start = savedStart + p.l = resetLexer + }() + for { eType, err := p.Next() if err != nil { @@ -303,12 +313,12 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 { // This might result in partial scrape with wrong/missing CT, but only // spec improvement would help. // TODO: Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this. - p.resetCTParseValues(resetLexer) + p.resetCTParseValues() return nil } if eType != EntrySeries { // Assume we hit different family, no CT line found. - p.resetCTParseValues(resetLexer) + p.resetCTParseValues() return nil } @@ -322,14 +332,14 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 { peekedHash := p.seriesHash(&buf, peekedName[:len(peekedName)-8]) if peekedHash != currHash { // Found CT line for a different series, for our series no CT. - p.resetCTParseValues(resetLexer) + p.resetCTParseValues() return nil } // All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds. // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps ct := int64(p.val * 1000.0) - p.setCTParseValues(ct, currHash, currName, true, resetLexer) + p.setCTParseValues(ct, currHash, currName, true) return &ct } } @@ -371,17 +381,15 @@ func (p *OpenMetricsParser) seriesHash(offsetsArr *[]byte, metricFamilyName []by // setCTParseValues sets the parser to the state after CreatedTimestamp method was called and CT was found. // This is useful to prevent re-parsing the same series again and early return the CT value. -func (p *OpenMetricsParser) setCTParseValues(ct int64, ctHashSet uint64, mfName []byte, skipCTSeries bool, resetLexer *openMetricsLexer) { +func (p *OpenMetricsParser) setCTParseValues(ct int64, ctHashSet uint64, mfName []byte, skipCTSeries bool) { p.ct = ct - p.l = resetLexer p.ctHashSet = ctHashSet p.visitedMFName = mfName p.skipCTSeries = skipCTSeries // Do we need to set it? } // resetCtParseValues resets the parser to the state before CreatedTimestamp method was called. -func (p *OpenMetricsParser) resetCTParseValues(resetLexer *openMetricsLexer) { - p.l = resetLexer +func (p *OpenMetricsParser) resetCTParseValues() { p.ctHashSet = 0 p.skipCTSeries = true } @@ -417,10 +425,12 @@ func (p *OpenMetricsParser) Next() (Entry, error) { p.start = p.l.i p.offsets = p.offsets[:0] - p.eOffsets = p.eOffsets[:0] - p.exemplar = p.exemplar[:0] - p.exemplarVal = 0 - p.hasExemplarTs = false + if !p.ignoreExemplar { + p.eOffsets = p.eOffsets[:0] + p.exemplar = p.exemplar[:0] + p.exemplarVal = 0 + p.hasExemplarTs = false + } switch t := p.nextToken(); t { case tEOFWord: @@ -545,6 +555,16 @@ func (p *OpenMetricsParser) Next() (Entry, error) { func (p *OpenMetricsParser) parseComment() error { var err error + + if p.ignoreExemplar { + for t := p.nextToken(); t != tLinebreak; t = p.nextToken() { + if t == tEOF { + return errors.New("data does not end with # EOF") + } + } + return nil + } + // Parse the labels. p.eOffsets, err = p.parseLVals(p.eOffsets, true) if err != nil {