Pair programming with Manik, Arthur and Daniel.

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2024-08-07 15:02:57 +01:00
parent 65e2afd6a9
commit 0479c46670
3 changed files with 46 additions and 23 deletions

View file

@ -447,7 +447,13 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
} }
p.series = p.l.b[p.start:p.l.i] p.series = p.l.b[p.start:p.l.i]
return p.parseMetricSuffix(p.nextToken()) if err := p.parseSeriesEndOfLine(p.nextToken()); err != nil {
return EntryInvalid, err
}
if p.skipCTSeries && p.isCreatedSeries() {
return p.Next()
}
return EntrySeries, nil
case tMName: case tMName:
p.offsets = append(p.offsets, p.start, p.l.i) p.offsets = append(p.offsets, p.start, p.l.i)
p.series = p.l.b[p.start:p.l.i] p.series = p.l.b[p.start:p.l.i]
@ -462,15 +468,13 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
t2 = p.nextToken() t2 = p.nextToken()
} }
suffixEntry, err := p.parseMetricSuffix(t2) if err := p.parseSeriesEndOfLine(t2); err != nil {
if err != nil { return EntryInvalid, err
return suffixEntry, err
} }
if p.skipCTSeries && p.isCreatedSeries() { if p.skipCTSeries && p.isCreatedSeries() {
return p.Next() return p.Next()
} }
return suffixEntry, err return EntrySeries, nil
default: default:
err = p.parseError("expected a valid start token", t) err = p.parseError("expected a valid start token", t)
} }
@ -601,52 +605,53 @@ func (p *OpenMetricsParser) isCreatedSeries() bool {
return false return false
} }
// parseMetricSuffix parses the end of the line after the metric name and // parseSeriesEndOfLine parses the series end of the line (value, optional
// labels. It starts parsing with the provided token. // timestamp, commentary, etc.) after the metric name and labels.
func (p *OpenMetricsParser) parseMetricSuffix(t token) (Entry, error) { // It starts parsing with the provided token.
func (p *OpenMetricsParser) parseSeriesEndOfLine(t token) error {
if p.offsets[0] == -1 { if p.offsets[0] == -1 {
return EntryInvalid, fmt.Errorf("metric name not set while parsing: %q", p.l.b[p.start:p.l.i]) return fmt.Errorf("metric name not set while parsing: %q", p.l.b[p.start:p.l.i])
} }
var err error var err error
p.val, err = p.getFloatValue(t, "metric") p.val, err = p.getFloatValue(t, "metric")
if err != nil { if err != nil {
return EntryInvalid, err return err
} }
p.hasTS = false p.hasTS = false
switch t2 := p.nextToken(); t2 { switch t2 := p.nextToken(); t2 {
case tEOF: case tEOF:
return EntryInvalid, errors.New("data does not end with # EOF") return errors.New("data does not end with # EOF")
case tLinebreak: case tLinebreak:
break break
case tComment: case tComment:
if err := p.parseComment(); err != nil { if err := p.parseComment(); err != nil {
return EntryInvalid, err return err
} }
case tTimestamp: case tTimestamp:
p.hasTS = true p.hasTS = true
var ts float64 var ts float64
// A float is enough to hold what we need for millisecond resolution. // A float is enough to hold what we need for millisecond resolution.
if ts, err = parseFloat(yoloString(p.l.buf()[1:])); err != nil { if ts, err = parseFloat(yoloString(p.l.buf()[1:])); err != nil {
return EntryInvalid, fmt.Errorf("%w while parsing: %q", err, p.l.b[p.start:p.l.i]) return fmt.Errorf("%w while parsing: %q", err, p.l.b[p.start:p.l.i])
} }
if math.IsNaN(ts) || math.IsInf(ts, 0) { if math.IsNaN(ts) || math.IsInf(ts, 0) {
return EntryInvalid, fmt.Errorf("invalid timestamp %f", ts) return fmt.Errorf("invalid timestamp %f", ts)
} }
p.ts = int64(ts * 1000) p.ts = int64(ts * 1000)
switch t3 := p.nextToken(); t3 { switch t3 := p.nextToken(); t3 {
case tLinebreak: case tLinebreak:
case tComment: case tComment:
if err := p.parseComment(); err != nil { if err := p.parseComment(); err != nil {
return EntryInvalid, err return err
} }
default: default:
return EntryInvalid, p.parseError("expected next entry after timestamp", t3) return p.parseError("expected next entry after timestamp", t3)
} }
} }
return EntrySeries, nil return nil
} }
func (p *OpenMetricsParser) getFloatValue(t token, after string) (float64, error) { func (p *OpenMetricsParser) getFloatValue(t token, after string) (float64, error) {

View file

@ -25,6 +25,8 @@ import (
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
) )
func int64p(x int64) *int64 { return &x }
func TestOpenMetricsParse(t *testing.T) { func TestOpenMetricsParse(t *testing.T) {
input := `# HELP go_gc_duration_seconds A summary of the GC invocation durations. input := `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary # TYPE go_gc_duration_seconds summary
@ -92,8 +94,6 @@ fizz_created 17.0`
input += "\nnull_byte_metric{a=\"abc\x00\"} 1" input += "\nnull_byte_metric{a=\"abc\x00\"} 1"
input += "\n# EOF\n" input += "\n# EOF\n"
int64p := func(x int64) *int64 { return &x }
exp := []expectedParse{ exp := []expectedParse{
{ {
m: "go_gc_duration_seconds", m: "go_gc_duration_seconds",
@ -331,7 +331,7 @@ fizz_created 17.0`
} }
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped()) p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
checkParseResults(t, p, exp) checkParseResultsWithCT(t, p, exp, true)
} }
func TestUTF8OpenMetricsParse(t *testing.T) { func TestUTF8OpenMetricsParse(t *testing.T) {
@ -346,6 +346,7 @@ func TestUTF8OpenMetricsParse(t *testing.T) {
# UNIT "go.gc_duration_seconds" seconds # UNIT "go.gc_duration_seconds" seconds
{"go.gc_duration_seconds",quantile="0"} 4.9351e-05 {"go.gc_duration_seconds",quantile="0"} 4.9351e-05
{"go.gc_duration_seconds",quantile="0.25"} 7.424100000000001e-05 {"go.gc_duration_seconds",quantile="0.25"} 7.424100000000001e-05
{"go.gc_duration_seconds_created"} 12313
{"go.gc_duration_seconds",quantile="0.5",a="b"} 8.3835e-05 {"go.gc_duration_seconds",quantile="0.5",a="b"} 8.3835e-05
{"http.status",q="0.9",a="b"} 8.3835e-05 {"http.status",q="0.9",a="b"} 8.3835e-05
{"http.status",q="0.9",a="b"} 8.3835e-05 {"http.status",q="0.9",a="b"} 8.3835e-05
@ -369,10 +370,12 @@ func TestUTF8OpenMetricsParse(t *testing.T) {
m: `{"go.gc_duration_seconds",quantile="0"}`, m: `{"go.gc_duration_seconds",quantile="0"}`,
v: 4.9351e-05, v: 4.9351e-05,
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0"), lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0"),
ct: int64p(12313),
}, { }, {
m: `{"go.gc_duration_seconds",quantile="0.25"}`, m: `{"go.gc_duration_seconds",quantile="0.25"}`,
v: 7.424100000000001e-05, v: 7.424100000000001e-05,
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0.25"), lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0.25"),
ct: int64p(12313),
}, { }, {
m: `{"go.gc_duration_seconds",quantile="0.5",a="b"}`, m: `{"go.gc_duration_seconds",quantile="0.5",a="b"}`,
v: 8.3835e-05, v: 8.3835e-05,
@ -401,8 +404,8 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"),
}, },
} }
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable()) p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
checkParseResults(t, p, exp) checkParseResultsWithCT(t, p, exp, true)
} }
func TestOpenMetricsParseErrors(t *testing.T) { func TestOpenMetricsParseErrors(t *testing.T) {

View file

@ -18,6 +18,7 @@ import (
"errors" "errors"
"io" "io"
"os" "os"
"strings"
"testing" "testing"
"github.com/klauspost/compress/gzip" "github.com/klauspost/compress/gzip"
@ -189,6 +190,10 @@ testmetric{label="\"bar\""} 1`
} }
func checkParseResults(t *testing.T, p Parser, exp []expectedParse) { func checkParseResults(t *testing.T, p Parser, exp []expectedParse) {
checkParseResultsWithCT(t, p, exp, false)
}
func checkParseResultsWithCT(t *testing.T, p Parser, exp []expectedParse, ctLinesRemoved bool) {
i := 0 i := 0
var res labels.Labels var res labels.Labels
@ -206,6 +211,16 @@ func checkParseResults(t *testing.T, p Parser, exp []expectedParse) {
p.Metric(&res) p.Metric(&res)
if ctLinesRemoved {
// Are CT series skipped?
_, typ := p.Type()
if typ == model.MetricTypeCounter || typ == model.MetricTypeSummary || typ == model.MetricTypeHistogram {
if strings.HasSuffix(res.Get(labels.MetricName), "_created") {
t.Fatalf("we exped created lines skipped")
}
}
}
require.Equal(t, exp[i].m, string(m)) require.Equal(t, exp[i].m, string(m))
require.Equal(t, exp[i].t, ts) require.Equal(t, exp[i].t, ts)
require.Equal(t, exp[i].v, v) require.Equal(t, exp[i].v, v)