chore: merge ct tests

Signed-off-by: Manik Rana <manikrana54@gmail.com>
This commit is contained in:
Manik Rana 2024-08-30 16:49:17 +05:30
parent 7a3daa80a7
commit 7905fbbae0

View file

@ -22,7 +22,6 @@ import (
"os"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -721,34 +720,82 @@ scrape_configs:
func TestManagerCTZeroIngestion(t *testing.T) {
const mName = "expected_counter"
type exp struct {
value float64
ts int64
}
for _, tc := range []struct {
name string
counterSample *dto.Counter
counterSampleProto *dto.Counter
counterSampleText string
enableCTZeroIngestion bool
exp exp
typ string
}{
{
name: "disabled with CT on counter",
counterSample: &dto.Counter{
name: "Protobuf disabled with CT on counter",
counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(),
},
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
},
{
name: "enabled with CT on counter",
counterSample: &dto.Counter{
name: "Protobuf enabled with CT on counter",
counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(),
},
enableCTZeroIngestion: true,
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
},
{
name: "enabled without CT on counter",
counterSample: &dto.Counter{
name: "Protobuf enabled without CT on counter",
counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0),
},
enableCTZeroIngestion: true,
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
},
{
name: "OMText disabled with CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
expected_counter_created 1000
# EOF`,
exp: exp{
value: 17.0,
ts: 1520879607789,
},
typ: "application/openmetrics-text; version=1.0.0",
},
{
name: "OMText enabled with CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
expected_counter_created 1000
# EOF`,
enableCTZeroIngestion: true,
exp: exp{
value: 0.0,
ts: 1000,
},
typ: "application/openmetrics-text; version=1.0.0",
},
{
name: "OMText enabled without CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
# EOF`,
enableCTZeroIngestion: true,
exp: exp{
value: 17.0,
ts: 1520879607789,
},
typ: "application/openmetrics-text; version=1.0.0",
},
} {
t.Run(tc.name, func(t *testing.T) {
@ -764,6 +811,14 @@ func TestManagerCTZeroIngestion(t *testing.T) {
)
require.NoError(t, err)
var scrapeProtoConfig []config.ScrapeProtocol
switch tc.typ {
case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
scrapeProtoConfig = []config.ScrapeProtocol{config.PrometheusProto}
case "application/openmetrics-text; version=1.0.0":
scrapeProtoConfig = []config.ScrapeProtocol{config.OpenMetricsText1_0_0}
}
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
// Disable regular scrapes.
@ -771,26 +826,36 @@ func TestManagerCTZeroIngestion(t *testing.T) {
ScrapeTimeout: model.Duration(5 * time.Second),
// Ensure the proto is chosen. We need proto as it's the only protocol
// with the CT parsing support.
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
ScrapeProtocols: scrapeProtoConfig,
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
once := sync.Once{}
// Prepare the sample to be ingested.
var toWrite []byte
switch tc.typ {
case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
ctrType := dto.MetricType_COUNTER
toWrite = protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Counter: tc.counterSampleProto}},
})
case "application/openmetrics-text; version=1.0.0":
toWrite = []byte(tc.counterSampleText)
}
// Start fake HTTP target to that allow one scrape only.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fail := true
once.Do(func() {
fail = false
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
w.Header().Set("Content-Type", tc.typ)
ctrType := dto.MetricType_COUNTER
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Counter: tc.counterSample}},
}))
w.Write(toWrite)
})
if fail {
@ -830,6 +895,7 @@ func TestManagerCTZeroIngestion(t *testing.T) {
got = append(got, f.f)
}
}
if len(app.resultFloats) > 0 {
return nil
}
@ -837,149 +903,24 @@ func TestManagerCTZeroIngestion(t *testing.T) {
}), "after 1 minute")
scrapeManager.Stop()
// Check for zero samples, assuming we only injected always one sample.
// Did it contain CT to inject? If yes, was CT zero enabled?
if tc.counterSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
require.Len(t, got, 2)
require.Equal(t, 0.0, got[0])
require.Equal(t, tc.counterSample.GetValue(), got[1])
return
switch tc.typ {
case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
// Check for zero samples, assuming we only injected always one sample.
// Did it contain CT to inject? If yes, was CT zero enabled?
if tc.counterSampleProto.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
require.Len(t, got, 2)
require.Equal(t, 0.0, got[0])
require.Equal(t, tc.counterSampleProto.GetValue(), got[1])
return
}
// Expect only one, valid sample.
require.Len(t, got, 1)
require.Equal(t, tc.counterSampleProto.GetValue(), got[0])
case "application/openmetrics-text; version=1.0.0":
require.Len(t, got, 1)
}
// Expect only one, valid sample.
require.Len(t, got, 1)
require.Equal(t, tc.counterSample.GetValue(), got[0])
})
}
}
func TestManagerCTZeroIngestionOM(t *testing.T) {
const mName = "foo"
for _, tc := range []struct {
name string
counterSample string
expCT int64
enableCTZeroIngestion bool
}{
{
name: "disabled with CT on counter",
counterSample: `# TYPE foo counter
foo_total 17.0 1520879607.789 # {id="counter-test"} 5
foo_created 1000
# EOF`,
enableCTZeroIngestion: false,
},
{
name: "enabled with CT on counter",
counterSample: `# TYPE foo counter
foo_total 17.0 1520879607.789 # {id="counter-test"} 5
foo_created 1000
# EOF`,
enableCTZeroIngestion: true,
},
{
name: "enabled without CT on counter",
counterSample: `# TYPE foo counter
foo_total 17.0 1520879607.789 # {id="counter-test"} 5
# EOF`,
enableCTZeroIngestion: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
skipOffsetting: true,
},
log.NewLogfmtLogger(os.Stderr),
&collectResultAppendable{app},
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
// Disable regular scrapes.
ScrapeInterval: model.Duration(9999 * time.Minute),
ScrapeTimeout: model.Duration(5 * time.Second),
// Ensure the proto is chosen. We need proto as it's the only protocol
// with the CT parsing support.
ScrapeProtocols: []config.ScrapeProtocol{config.OpenMetricsText1_0_0},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
once := sync.Once{}
// Start fake HTTP target to that allow one scrape only.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fail := true
once.Do(func() {
fail = false
w.Header().Set("Content-Type", `application/openmetrics-text; version=1.0.0`)
w.Write([]byte(tc.counterSample))
})
if fail {
w.WriteHeader(http.StatusInternalServerError)
}
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
}},
})
scrapeManager.reload()
var got []int64
// Wait for one scrape.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
app.mtx.Lock()
defer app.mtx.Unlock()
// Check if scrape happened and grab the relevant samples, they have to be there - or it's a bug
// and it's not worth waiting.
for _, f := range app.resultFloats {
mName := f.metric.Get(model.MetricNameLabel)
if strings.HasSuffix(mName, "_created") {
got = append(got, int64(f.f))
}
}
if len(app.resultFloats) > 0 {
return nil
}
return fmt.Errorf("expected some samples, got none")
}), "after 1 minute")
scrapeManager.Stop()
// Check for zero samples, assuming we only injected always one sample.
// Did it contain CT to inject? If yes, was CT zero enabled?
// if tc.counterSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
// require.Len(t, got, 2)
// require.Equal(t, 0.0, got[0])
// require.Equal(t, tc.counterSample.GetValue(), got[1])
// return
// }
// Expect only one, valid sample.
require.Len(t, got, 1)
require.Equal(t, tc.expCT, got[0])
})
}
}