This commit is contained in:
Manik Rana 2024-09-19 23:17:12 +05:30 committed by GitHub
commit 2bbea06709
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 118 additions and 47 deletions

View file

@ -80,7 +80,7 @@ type Parser interface {
// //
// This function always returns a valid parser, but might additionally // This function always returns a valid parser, but might additionally
// return an error if the content type cannot be parsed. // return an error if the content type cannot be parsed.
func New(b []byte, contentType string, parseClassicHistograms bool, st *labels.SymbolTable) (Parser, error) { func New(b []byte, contentType string, parseClassicHistograms, skipOMCTSeries bool, st *labels.SymbolTable) (Parser, error) {
if contentType == "" { if contentType == "" {
return NewPromParser(b, st), nil return NewPromParser(b, st), nil
} }
@ -91,7 +91,10 @@ func New(b []byte, contentType string, parseClassicHistograms bool, st *labels.S
} }
switch mediaType { switch mediaType {
case "application/openmetrics-text": case "application/openmetrics-text":
return NewOpenMetricsParser(b, st), nil opts := func(o *openMetricsParserOptions) {
o.SkipCTSeries = skipOMCTSeries
}
return NewOpenMetricsParser(b, st, opts), nil
case "application/vnd.google.protobuf": case "application/vnd.google.protobuf":
return NewProtobufParser(b, parseClassicHistograms, st), nil return NewProtobufParser(b, parseClassicHistograms, st), nil
default: default:

View file

@ -93,7 +93,7 @@ func TestNewParser(t *testing.T) {
tt := tt // Copy to local variable before going parallel. tt := tt // Copy to local variable before going parallel.
t.Parallel() t.Parallel()
p, err := New([]byte{}, tt.contentType, false, labels.NewSymbolTable()) p, err := New([]byte{}, tt.contentType, false, false, labels.NewSymbolTable())
tt.validateParser(t, p) tt.validateParser(t, p)
if tt.err == "" { if tt.err == "" {
require.NoError(t, err) require.NoError(t, err)

View file

@ -252,7 +252,7 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
// CreatedTimestamp returns the created timestamp for a current Metric if exists or nil. // CreatedTimestamp returns the created timestamp for a current Metric if exists or nil.
// NOTE(Maniktherana): Might use additional CPU/mem resources due to deep copy of parser required for peeking given 1.0 OM specification on _created series. // NOTE(Maniktherana): Might use additional CPU/mem resources due to deep copy of parser required for peeking given 1.0 OM specification on _created series.
func (p *OpenMetricsParser) CreatedTimestamp() *int64 { func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
if !TypeRequiresCT(p.mtype) { if !typeRequiresCT(p.mtype) {
// Not a CT supported metric type, fast path. // Not a CT supported metric type, fast path.
return nil return nil
} }
@ -302,8 +302,8 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
} }
} }
// TypeRequiresCT returns true if the metric type requires a _created timestamp. // typeRequiresCT returns true if the metric type requires a _created timestamp.
func TypeRequiresCT(t model.MetricType) bool { func typeRequiresCT(t model.MetricType) bool {
switch t { switch t {
case model.MetricTypeCounter, model.MetricTypeSummary, model.MetricTypeHistogram: case model.MetricTypeCounter, model.MetricTypeSummary, model.MetricTypeHistogram:
return true return true
@ -594,7 +594,7 @@ func (p *OpenMetricsParser) isCreatedSeries() bool {
var newLbs labels.Labels var newLbs labels.Labels
p.Metric(&newLbs) p.Metric(&newLbs)
name := newLbs.Get(model.MetricNameLabel) name := newLbs.Get(model.MetricNameLabel)
if TypeRequiresCT(p.mtype) && strings.HasSuffix(name, "_created") { if typeRequiresCT(p.mtype) && strings.HasSuffix(name, "_created") {
return true return true
} }
return false return false

View file

@ -699,7 +699,7 @@ func TestOpenMetricsParseErrors(t *testing.T) {
} }
for i, c := range cases { for i, c := range cases {
p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable()) p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
var err error var err error
for err == nil { for err == nil {
_, err = p.Next() _, err = p.Next()
@ -764,7 +764,7 @@ func TestOMNullByteHandling(t *testing.T) {
} }
for i, c := range cases { for i, c := range cases {
p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable()) p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
var err error var err error
for err == nil { for err == nil {
_, err = p.Next() _, err = p.Next()

View file

@ -214,7 +214,7 @@ func checkParseResultsWithCT(t *testing.T, p Parser, exp []expectedParse, ctLine
if ctLinesRemoved { if ctLinesRemoved {
// Are CT series skipped? // Are CT series skipped?
_, typ := p.Type() _, typ := p.Type()
if TypeRequiresCT(typ) && strings.HasSuffix(res.Get(labels.MetricName), "_created") { if typeRequiresCT(typ) && strings.HasSuffix(res.Get(labels.MetricName), "_created") {
t.Fatalf("we exped created lines skipped") t.Fatalf("we exped created lines skipped")
} }
} }

View file

@ -61,17 +61,13 @@ const (
var symbolTable = labels.NewSymbolTable() var symbolTable = labels.NewSymbolTable()
func fuzzParseMetricWithContentType(in []byte, contentType string) int { func fuzzParseMetricWithContentType(in []byte, contentType string) int {
p, warning := textparse.New(in, contentType, false, symbolTable) p, warning := textparse.New(in, contentType, false, false, symbolTable)
if warning != nil { if warning != nil {
// An invalid content type is being passed, which should not happen // An invalid content type is being passed, which should not happen
// in this context. // in this context.
panic(warning) panic(warning)
} }
if contentType == "application/openmetrics-text" {
p = textparse.NewOpenMetricsParser(in, symbolTable)
}
var err error var err error
for { for {
_, err = p.Next() _, err = p.Next()

View file

@ -725,34 +725,88 @@ scrape_configs:
func TestManagerCTZeroIngestion(t *testing.T) { func TestManagerCTZeroIngestion(t *testing.T) {
const mName = "expected_counter" const mName = "expected_counter"
type exp struct {
value float64
ts int64
}
for _, tc := range []struct { for _, tc := range []struct {
name string name string
counterSample *dto.Counter counterSampleProto *dto.Counter
counterSampleText string
enableCTZeroIngestion bool enableCTZeroIngestion bool
exp []exp
typ string
}{ }{
{ {
name: "disabled with CT on counter", name: "Protobuf disabled with CT on counter",
counterSample: &dto.Counter{ counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0), Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test. // Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(), CreatedTimestamp: timestamppb.Now(),
}, },
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
}, },
{ {
name: "enabled with CT on counter", name: "Protobuf enabled with CT on counter",
counterSample: &dto.Counter{ counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0), Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test. // Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(), CreatedTimestamp: timestamppb.Now(),
}, },
enableCTZeroIngestion: true, enableCTZeroIngestion: true,
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
}, },
{ {
name: "enabled without CT on counter", name: "Protobuf enabled without CT on counter",
counterSample: &dto.Counter{ counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0), Value: proto.Float64(1.0),
}, },
enableCTZeroIngestion: true, enableCTZeroIngestion: true,
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
},
{
name: "OMText disabled with CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
expected_counter_created 1000
# EOF`,
exp: []exp{{
value: 17.0,
ts: 1520879607789,
}},
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
},
{
name: "OMText enabled with CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
expected_counter_created 1000
# EOF`,
enableCTZeroIngestion: true,
exp: []exp{
{
value: 0.0,
ts: 1000,
},
{
value: 17.0,
ts: 1520879607789,
},
},
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
},
{
name: "OMText enabled without CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
# EOF`,
enableCTZeroIngestion: true,
exp: []exp{{
value: 17.0,
ts: 1520879607789,
}},
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
}, },
} { } {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
@ -772,30 +826,38 @@ func TestManagerCTZeroIngestion(t *testing.T) {
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{ GlobalConfig: config.GlobalConfig{
// Disable regular scrapes. // Disable regular scrapes.
ScrapeInterval: model.Duration(9999 * time.Minute), ScrapeInterval: model.Duration(9999 * time.Minute),
ScrapeTimeout: model.Duration(5 * time.Second), ScrapeTimeout: model.Duration(5 * time.Second),
// Ensure the proto is chosen. We need proto as it's the only protocol ScrapeProtocols: []config.ScrapeProtocol{config.OpenMetricsText1_0_0, config.PrometheusProto},
// with the CT parsing support.
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
}, },
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
})) }))
once := sync.Once{} once := sync.Once{}
// Prepare the sample to be ingested.
var toWrite []byte
switch tc.typ {
case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
ctrType := dto.MetricType_COUNTER
toWrite = protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Counter: tc.counterSampleProto}},
})
case "application/openmetrics-text; version=1.0.0; charset=utf-8":
toWrite = []byte(tc.counterSampleText)
}
// Start fake HTTP target to that allow one scrape only. // Start fake HTTP target to that allow one scrape only.
server := httptest.NewServer( server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fail := true fail := true
once.Do(func() { once.Do(func() {
fail = false fail = false
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) w.Header().Set("Content-Type", tc.typ)
ctrType := dto.MetricType_COUNTER w.Write(toWrite)
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Counter: tc.counterSample}},
}))
}) })
if fail { if fail {
@ -835,6 +897,7 @@ func TestManagerCTZeroIngestion(t *testing.T) {
got = append(got, f.f) got = append(got, f.f)
} }
} }
if len(app.resultFloats) > 0 { if len(app.resultFloats) > 0 {
return nil return nil
} }
@ -842,18 +905,27 @@ func TestManagerCTZeroIngestion(t *testing.T) {
}), "after 1 minute") }), "after 1 minute")
scrapeManager.Stop() scrapeManager.Stop()
// Check for zero samples, assuming we only injected always one sample. switch tc.typ {
// Did it contain CT to inject? If yes, was CT zero enabled? case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
if tc.counterSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion { // Check for zero samples, assuming we only injected always one sample.
require.Len(t, got, 2) // Did it contain CT to inject? If yes, was CT zero enabled?
require.Equal(t, 0.0, got[0]) if tc.counterSampleProto.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
require.Equal(t, tc.counterSample.GetValue(), got[1]) require.Len(t, got, 2)
return require.Equal(t, 0.0, got[0])
} require.Equal(t, tc.counterSampleProto.GetValue(), got[1])
return
}
// Expect only one, valid sample. // Expect only one, valid sample.
require.Len(t, got, 1) require.Len(t, got, 1)
require.Equal(t, tc.counterSample.GetValue(), got[0]) require.Equal(t, tc.counterSampleProto.GetValue(), got[0])
case "application/openmetrics-text; version=1.0.0; charset=utf-8":
require.Len(t, got, len(tc.exp))
for i, e := range tc.exp {
require.Equal(t, e.value, got[i])
}
}
}) })
} }
} }

View file

@ -1539,7 +1539,7 @@ type appendErrors struct {
} }
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.symbolTable) p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.enableCTZeroIngestion, sl.symbolTable)
if err != nil { if err != nil {
level.Debug(sl.l).Log( level.Debug(sl.l).Log(
"msg", "Invalid content type on scrape, using prometheus parser as fallback.", "msg", "Invalid content type on scrape, using prometheus parser as fallback.",

View file

@ -1525,7 +1525,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
fakeRef := storage.SeriesRef(1) fakeRef := storage.SeriesRef(1)
expValue := float64(1) expValue := float64(1)
metric := []byte(`metric{n="1"} 1`) metric := []byte(`metric{n="1"} 1`)
p, warning := textparse.New(metric, "", false, labels.NewSymbolTable()) p, warning := textparse.New(metric, "", false, false, labels.NewSymbolTable())
require.NoError(t, warning) require.NoError(t, warning)
var lset labels.Labels var lset labels.Labels