mirror of
https://github.com/prometheus/prometheus.git
synced 2024-09-19 23:37:31 -07:00
Compare commits
18 commits
2bbea06709
...
2ad47b1e6b
Author | SHA1 | Date | |
---|---|---|---|
2ad47b1e6b | |||
9536ff4ecb | |||
8c070dcc41 | |||
31a1b4bf7f | |||
808d920415 | |||
8f088845d1 | |||
5f7bcff912 | |||
c58a173382 | |||
7905fbbae0 | |||
7a3daa80a7 | |||
18f95cc994 | |||
bec708eda8 | |||
fd2a2b1b9e | |||
0caaa96206 | |||
3de6bffae7 | |||
d14eb733e8 | |||
82064d42b8 | |||
c4052ba23d |
|
@ -80,7 +80,7 @@ type Parser interface {
|
|||
//
|
||||
// This function always returns a valid parser, but might additionally
|
||||
// return an error if the content type cannot be parsed.
|
||||
func New(b []byte, contentType string, parseClassicHistograms bool, st *labels.SymbolTable) (Parser, error) {
|
||||
func New(b []byte, contentType string, parseClassicHistograms, skipOMCTSeries bool, st *labels.SymbolTable) (Parser, error) {
|
||||
if contentType == "" {
|
||||
return NewPromParser(b, st), nil
|
||||
}
|
||||
|
@ -91,7 +91,10 @@ func New(b []byte, contentType string, parseClassicHistograms bool, st *labels.S
|
|||
}
|
||||
switch mediaType {
|
||||
case "application/openmetrics-text":
|
||||
return NewOpenMetricsParser(b, st), nil
|
||||
opts := func(o *openMetricsParserOptions) {
|
||||
o.SkipCTSeries = skipOMCTSeries
|
||||
}
|
||||
return NewOpenMetricsParser(b, st, opts), nil
|
||||
case "application/vnd.google.protobuf":
|
||||
return NewProtobufParser(b, parseClassicHistograms, st), nil
|
||||
default:
|
||||
|
|
|
@ -93,7 +93,7 @@ func TestNewParser(t *testing.T) {
|
|||
tt := tt // Copy to local variable before going parallel.
|
||||
t.Parallel()
|
||||
|
||||
p, err := New([]byte{}, tt.contentType, false, labels.NewSymbolTable())
|
||||
p, err := New([]byte{}, tt.contentType, false, false, labels.NewSymbolTable())
|
||||
tt.validateParser(t, p)
|
||||
if tt.err == "" {
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -252,7 +252,7 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
|
|||
// CreatedTimestamp returns the created timestamp for a current Metric if exists or nil.
|
||||
// NOTE(Maniktherana): Might use additional CPU/mem resources due to deep copy of parser required for peeking given 1.0 OM specification on _created series.
|
||||
func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
|
||||
if !TypeRequiresCT(p.mtype) {
|
||||
if !typeRequiresCT(p.mtype) {
|
||||
// Not a CT supported metric type, fast path.
|
||||
return nil
|
||||
}
|
||||
|
@ -302,8 +302,8 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
|
|||
}
|
||||
}
|
||||
|
||||
// TypeRequiresCT returns true if the metric type requires a _created timestamp.
|
||||
func TypeRequiresCT(t model.MetricType) bool {
|
||||
// typeRequiresCT returns true if the metric type requires a _created timestamp.
|
||||
func typeRequiresCT(t model.MetricType) bool {
|
||||
switch t {
|
||||
case model.MetricTypeCounter, model.MetricTypeSummary, model.MetricTypeHistogram:
|
||||
return true
|
||||
|
@ -594,7 +594,7 @@ func (p *OpenMetricsParser) isCreatedSeries() bool {
|
|||
var newLbs labels.Labels
|
||||
p.Metric(&newLbs)
|
||||
name := newLbs.Get(model.MetricNameLabel)
|
||||
if TypeRequiresCT(p.mtype) && strings.HasSuffix(name, "_created") {
|
||||
if typeRequiresCT(p.mtype) && strings.HasSuffix(name, "_created") {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
|
|
@ -699,7 +699,7 @@ func TestOpenMetricsParseErrors(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, c := range cases {
|
||||
p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable())
|
||||
p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
|
||||
var err error
|
||||
for err == nil {
|
||||
_, err = p.Next()
|
||||
|
@ -764,7 +764,7 @@ func TestOMNullByteHandling(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, c := range cases {
|
||||
p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable())
|
||||
p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
|
||||
var err error
|
||||
for err == nil {
|
||||
_, err = p.Next()
|
||||
|
|
|
@ -214,7 +214,7 @@ func checkParseResultsWithCT(t *testing.T, p Parser, exp []expectedParse, ctLine
|
|||
if ctLinesRemoved {
|
||||
// Are CT series skipped?
|
||||
_, typ := p.Type()
|
||||
if TypeRequiresCT(typ) && strings.HasSuffix(res.Get(labels.MetricName), "_created") {
|
||||
if typeRequiresCT(typ) && strings.HasSuffix(res.Get(labels.MetricName), "_created") {
|
||||
t.Fatalf("we exped created lines skipped")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,17 +61,13 @@ const (
|
|||
var symbolTable = labels.NewSymbolTable()
|
||||
|
||||
func fuzzParseMetricWithContentType(in []byte, contentType string) int {
|
||||
p, warning := textparse.New(in, contentType, false, symbolTable)
|
||||
p, warning := textparse.New(in, contentType, false, false, symbolTable)
|
||||
if warning != nil {
|
||||
// An invalid content type is being passed, which should not happen
|
||||
// in this context.
|
||||
panic(warning)
|
||||
}
|
||||
|
||||
if contentType == "application/openmetrics-text" {
|
||||
p = textparse.NewOpenMetricsParser(in, symbolTable)
|
||||
}
|
||||
|
||||
var err error
|
||||
for {
|
||||
_, err = p.Next()
|
||||
|
|
|
@ -725,34 +725,88 @@ scrape_configs:
|
|||
func TestManagerCTZeroIngestion(t *testing.T) {
|
||||
const mName = "expected_counter"
|
||||
|
||||
type exp struct {
|
||||
value float64
|
||||
ts int64
|
||||
}
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
counterSample *dto.Counter
|
||||
counterSampleProto *dto.Counter
|
||||
counterSampleText string
|
||||
enableCTZeroIngestion bool
|
||||
exp []exp
|
||||
typ string
|
||||
}{
|
||||
{
|
||||
name: "disabled with CT on counter",
|
||||
counterSample: &dto.Counter{
|
||||
name: "Protobuf disabled with CT on counter",
|
||||
counterSampleProto: &dto.Counter{
|
||||
Value: proto.Float64(1.0),
|
||||
// Timestamp does not matter as long as it exists in this test.
|
||||
CreatedTimestamp: timestamppb.Now(),
|
||||
},
|
||||
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
|
||||
},
|
||||
{
|
||||
name: "enabled with CT on counter",
|
||||
counterSample: &dto.Counter{
|
||||
name: "Protobuf enabled with CT on counter",
|
||||
counterSampleProto: &dto.Counter{
|
||||
Value: proto.Float64(1.0),
|
||||
// Timestamp does not matter as long as it exists in this test.
|
||||
CreatedTimestamp: timestamppb.Now(),
|
||||
},
|
||||
enableCTZeroIngestion: true,
|
||||
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
|
||||
},
|
||||
{
|
||||
name: "enabled without CT on counter",
|
||||
counterSample: &dto.Counter{
|
||||
name: "Protobuf enabled without CT on counter",
|
||||
counterSampleProto: &dto.Counter{
|
||||
Value: proto.Float64(1.0),
|
||||
},
|
||||
enableCTZeroIngestion: true,
|
||||
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
|
||||
},
|
||||
{
|
||||
name: "OMText disabled with CT on counter",
|
||||
counterSampleText: `# TYPE expected_counter counter
|
||||
expected_counter 17.0 1520879607.789
|
||||
expected_counter_created 1000
|
||||
# EOF`,
|
||||
exp: []exp{{
|
||||
value: 17.0,
|
||||
ts: 1520879607789,
|
||||
}},
|
||||
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
|
||||
},
|
||||
{
|
||||
name: "OMText enabled with CT on counter",
|
||||
counterSampleText: `# TYPE expected_counter counter
|
||||
expected_counter 17.0 1520879607.789
|
||||
expected_counter_created 1000
|
||||
# EOF`,
|
||||
enableCTZeroIngestion: true,
|
||||
exp: []exp{
|
||||
{
|
||||
value: 0.0,
|
||||
ts: 1000,
|
||||
},
|
||||
{
|
||||
value: 17.0,
|
||||
ts: 1520879607789,
|
||||
},
|
||||
},
|
||||
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
|
||||
},
|
||||
{
|
||||
name: "OMText enabled without CT on counter",
|
||||
counterSampleText: `# TYPE expected_counter counter
|
||||
expected_counter 17.0 1520879607.789
|
||||
# EOF`,
|
||||
enableCTZeroIngestion: true,
|
||||
exp: []exp{{
|
||||
value: 17.0,
|
||||
ts: 1520879607789,
|
||||
}},
|
||||
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
@ -772,30 +826,38 @@ func TestManagerCTZeroIngestion(t *testing.T) {
|
|||
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
// Disable regular scrapes.
|
||||
ScrapeInterval: model.Duration(9999 * time.Minute),
|
||||
ScrapeTimeout: model.Duration(5 * time.Second),
|
||||
// Ensure the proto is chosen. We need proto as it's the only protocol
|
||||
// with the CT parsing support.
|
||||
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
|
||||
ScrapeInterval: model.Duration(9999 * time.Minute),
|
||||
ScrapeTimeout: model.Duration(5 * time.Second),
|
||||
ScrapeProtocols: []config.ScrapeProtocol{config.OpenMetricsText1_0_0, config.PrometheusProto},
|
||||
},
|
||||
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
|
||||
}))
|
||||
|
||||
once := sync.Once{}
|
||||
|
||||
// Prepare the sample to be ingested.
|
||||
var toWrite []byte
|
||||
switch tc.typ {
|
||||
case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
|
||||
ctrType := dto.MetricType_COUNTER
|
||||
toWrite = protoMarshalDelimited(t, &dto.MetricFamily{
|
||||
Name: proto.String(mName),
|
||||
Type: &ctrType,
|
||||
Metric: []*dto.Metric{{Counter: tc.counterSampleProto}},
|
||||
})
|
||||
case "application/openmetrics-text; version=1.0.0; charset=utf-8":
|
||||
toWrite = []byte(tc.counterSampleText)
|
||||
}
|
||||
|
||||
// Start fake HTTP target to that allow one scrape only.
|
||||
server := httptest.NewServer(
|
||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
fail := true
|
||||
once.Do(func() {
|
||||
fail = false
|
||||
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
|
||||
w.Header().Set("Content-Type", tc.typ)
|
||||
|
||||
ctrType := dto.MetricType_COUNTER
|
||||
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
|
||||
Name: proto.String(mName),
|
||||
Type: &ctrType,
|
||||
Metric: []*dto.Metric{{Counter: tc.counterSample}},
|
||||
}))
|
||||
w.Write(toWrite)
|
||||
})
|
||||
|
||||
if fail {
|
||||
|
@ -835,6 +897,7 @@ func TestManagerCTZeroIngestion(t *testing.T) {
|
|||
got = append(got, f.f)
|
||||
}
|
||||
}
|
||||
|
||||
if len(app.resultFloats) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -842,18 +905,27 @@ func TestManagerCTZeroIngestion(t *testing.T) {
|
|||
}), "after 1 minute")
|
||||
scrapeManager.Stop()
|
||||
|
||||
// Check for zero samples, assuming we only injected always one sample.
|
||||
// Did it contain CT to inject? If yes, was CT zero enabled?
|
||||
if tc.counterSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
|
||||
require.Len(t, got, 2)
|
||||
require.Equal(t, 0.0, got[0])
|
||||
require.Equal(t, tc.counterSample.GetValue(), got[1])
|
||||
return
|
||||
}
|
||||
switch tc.typ {
|
||||
case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
|
||||
// Check for zero samples, assuming we only injected always one sample.
|
||||
// Did it contain CT to inject? If yes, was CT zero enabled?
|
||||
if tc.counterSampleProto.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
|
||||
require.Len(t, got, 2)
|
||||
require.Equal(t, 0.0, got[0])
|
||||
require.Equal(t, tc.counterSampleProto.GetValue(), got[1])
|
||||
return
|
||||
}
|
||||
|
||||
// Expect only one, valid sample.
|
||||
require.Len(t, got, 1)
|
||||
require.Equal(t, tc.counterSample.GetValue(), got[0])
|
||||
// Expect only one, valid sample.
|
||||
require.Len(t, got, 1)
|
||||
require.Equal(t, tc.counterSampleProto.GetValue(), got[0])
|
||||
|
||||
case "application/openmetrics-text; version=1.0.0; charset=utf-8":
|
||||
require.Len(t, got, len(tc.exp))
|
||||
for i, e := range tc.exp {
|
||||
require.Equal(t, e.value, got[i])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1539,7 +1539,7 @@ type appendErrors struct {
|
|||
}
|
||||
|
||||
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
|
||||
p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.symbolTable)
|
||||
p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.enableCTZeroIngestion, sl.symbolTable)
|
||||
if err != nil {
|
||||
level.Debug(sl.l).Log(
|
||||
"msg", "Invalid content type on scrape, using prometheus parser as fallback.",
|
||||
|
|
|
@ -1525,7 +1525,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
|
|||
fakeRef := storage.SeriesRef(1)
|
||||
expValue := float64(1)
|
||||
metric := []byte(`metric{n="1"} 1`)
|
||||
p, warning := textparse.New(metric, "", false, labels.NewSymbolTable())
|
||||
p, warning := textparse.New(metric, "", false, false, labels.NewSymbolTable())
|
||||
require.NoError(t, warning)
|
||||
|
||||
var lset labels.Labels
|
||||
|
|
Loading…
Reference in a new issue