mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
Merge pull request #2288 from prometheus/limit-scrape
Add ability to limit scrape samples, and related metrics
This commit is contained in:
commit
ad40d0abbc
|
@ -497,6 +497,8 @@ type ScrapeConfig struct {
|
|||
MetricsPath string `yaml:"metrics_path,omitempty"`
|
||||
// The URL scheme with which to fetch metrics from targets.
|
||||
Scheme string `yaml:"scheme,omitempty"`
|
||||
// More than this many samples post metric-relabelling will cause the scrape to fail.
|
||||
SampleLimit uint `yaml:"sample_limit,omitempty"`
|
||||
|
||||
// We cannot do proper Go type embedding below as the parser will then parse
|
||||
// values arbitrarily into the overflow maps of further-down types.
|
||||
|
|
|
@ -133,6 +133,7 @@ var expectedConf = &Config{
|
|||
|
||||
ScrapeInterval: model.Duration(50 * time.Second),
|
||||
ScrapeTimeout: model.Duration(5 * time.Second),
|
||||
SampleLimit: 1000,
|
||||
|
||||
HTTPClientConfig: HTTPClientConfig{
|
||||
BasicAuth: &BasicAuth{
|
||||
|
|
2
config/testdata/conf.good.yml
vendored
2
config/testdata/conf.good.yml
vendored
|
@ -70,6 +70,8 @@ scrape_configs:
|
|||
scrape_interval: 50s
|
||||
scrape_timeout: 5s
|
||||
|
||||
sample_limit: 1000
|
||||
|
||||
metrics_path: /my_path
|
||||
scheme: https
|
||||
|
||||
|
|
|
@ -33,9 +33,10 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
scrapeHealthMetricName = "up"
|
||||
scrapeDurationMetricName = "scrape_duration_seconds"
|
||||
scrapeSamplesMetricName = "scrape_samples_scraped"
|
||||
scrapeHealthMetricName = "up"
|
||||
scrapeDurationMetricName = "scrape_duration_seconds"
|
||||
scrapeSamplesMetricName = "scrape_samples_scraped"
|
||||
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -76,6 +77,12 @@ var (
|
|||
},
|
||||
[]string{"scrape_job"},
|
||||
)
|
||||
targetScrapeSampleLimit = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_target_scrapes_exceeded_sample_limit_total",
|
||||
Help: "Total number of scrapes that hit the sample limit and were rejected.",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -84,6 +91,7 @@ func init() {
|
|||
prometheus.MustRegister(targetReloadIntervalLength)
|
||||
prometheus.MustRegister(targetSyncIntervalLength)
|
||||
prometheus.MustRegister(targetScrapePoolSyncsCounter)
|
||||
prometheus.MustRegister(targetScrapeSampleLimit)
|
||||
}
|
||||
|
||||
// scrapePool manages scrapes for sets of targets.
|
||||
|
@ -101,7 +109,7 @@ type scrapePool struct {
|
|||
loops map[uint64]loop
|
||||
|
||||
// Constructor for new scrape loops. This is settable for testing convenience.
|
||||
newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop
|
||||
newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop
|
||||
}
|
||||
|
||||
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
|
||||
|
@ -170,7 +178,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|||
var (
|
||||
t = sp.targets[fp]
|
||||
s = &targetScraper{Target: t, client: sp.client}
|
||||
newLoop = sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t))
|
||||
newLoop = sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)
|
||||
)
|
||||
wg.Add(1)
|
||||
|
||||
|
@ -231,7 +239,7 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|||
|
||||
if _, ok := sp.targets[hash]; !ok {
|
||||
s := &targetScraper{Target: t, client: sp.client}
|
||||
l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t))
|
||||
l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)
|
||||
|
||||
sp.targets[hash] = t
|
||||
sp.loops[hash] = l
|
||||
|
@ -263,40 +271,6 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
// sampleAppender returns an appender for ingested samples from the target.
|
||||
func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender {
|
||||
app := sp.appender
|
||||
// The relabelAppender has to be inside the label-modifying appenders
|
||||
// so the relabeling rules are applied to the correct label set.
|
||||
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
|
||||
app = relabelAppender{
|
||||
SampleAppender: app,
|
||||
relabelings: mrc,
|
||||
}
|
||||
}
|
||||
|
||||
if sp.config.HonorLabels {
|
||||
app = honorLabelsAppender{
|
||||
SampleAppender: app,
|
||||
labels: target.Labels(),
|
||||
}
|
||||
} else {
|
||||
app = ruleLabelsAppender{
|
||||
SampleAppender: app,
|
||||
labels: target.Labels(),
|
||||
}
|
||||
}
|
||||
return app
|
||||
}
|
||||
|
||||
// reportAppender returns an appender for reporting samples for the target.
|
||||
func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender {
|
||||
return ruleLabelsAppender{
|
||||
SampleAppender: sp.appender,
|
||||
labels: target.Labels(),
|
||||
}
|
||||
}
|
||||
|
||||
// A scraper retrieves samples and accepts a status report at the end.
|
||||
type scraper interface {
|
||||
scrape(ctx context.Context, ts time.Time) (model.Samples, error)
|
||||
|
@ -364,20 +338,34 @@ type loop interface {
|
|||
type scrapeLoop struct {
|
||||
scraper scraper
|
||||
|
||||
appender storage.SampleAppender
|
||||
reportAppender storage.SampleAppender
|
||||
// Where samples are ultimately sent.
|
||||
appender storage.SampleAppender
|
||||
|
||||
targetLabels model.LabelSet
|
||||
metricRelabelConfigs []*config.RelabelConfig
|
||||
honorLabels bool
|
||||
sampleLimit uint
|
||||
|
||||
done chan struct{}
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop {
|
||||
func newScrapeLoop(
|
||||
ctx context.Context,
|
||||
sc scraper,
|
||||
appender storage.SampleAppender,
|
||||
targetLabels model.LabelSet,
|
||||
config *config.ScrapeConfig,
|
||||
) loop {
|
||||
sl := &scrapeLoop{
|
||||
scraper: sc,
|
||||
appender: app,
|
||||
reportAppender: reportApp,
|
||||
done: make(chan struct{}),
|
||||
scraper: sc,
|
||||
appender: appender,
|
||||
targetLabels: targetLabels,
|
||||
metricRelabelConfigs: config.MetricRelabelConfigs,
|
||||
honorLabels: config.HonorLabels,
|
||||
sampleLimit: config.SampleLimit,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
sl.ctx, sl.cancel = context.WithCancel(ctx)
|
||||
|
||||
|
@ -408,8 +396,9 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
|||
|
||||
if !sl.appender.NeedsThrottling() {
|
||||
var (
|
||||
start = time.Now()
|
||||
scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout)
|
||||
start = time.Now()
|
||||
scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout)
|
||||
numPostRelabelSamples = 0
|
||||
)
|
||||
|
||||
// Only record after the first scrape.
|
||||
|
@ -421,12 +410,12 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
|||
|
||||
samples, err := sl.scraper.scrape(scrapeCtx, start)
|
||||
if err == nil {
|
||||
sl.append(samples)
|
||||
} else if errc != nil {
|
||||
numPostRelabelSamples, err = sl.append(samples)
|
||||
}
|
||||
if err != nil && errc != nil {
|
||||
errc <- err
|
||||
}
|
||||
|
||||
sl.report(start, time.Since(start), len(samples), err)
|
||||
sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err)
|
||||
last = start
|
||||
} else {
|
||||
targetSkippedScrapes.Inc()
|
||||
|
@ -445,14 +434,73 @@ func (sl *scrapeLoop) stop() {
|
|||
<-sl.done
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) append(samples model.Samples) {
|
||||
// wrapAppender wraps a SampleAppender for relabeling. It returns the wrappend
|
||||
// appender and an innermost countingAppender that counts the samples actually
|
||||
// appended in the end.
|
||||
func (sl *scrapeLoop) wrapAppender(app storage.SampleAppender) (storage.SampleAppender, *countingAppender) {
|
||||
// Innermost appender is a countingAppender to count how many samples
|
||||
// are left in the end.
|
||||
countingAppender := &countingAppender{
|
||||
SampleAppender: app,
|
||||
}
|
||||
app = countingAppender
|
||||
|
||||
// The relabelAppender has to be inside the label-modifying appenders so
|
||||
// the relabeling rules are applied to the correct label set.
|
||||
if len(sl.metricRelabelConfigs) > 0 {
|
||||
app = relabelAppender{
|
||||
SampleAppender: app,
|
||||
relabelings: sl.metricRelabelConfigs,
|
||||
}
|
||||
}
|
||||
|
||||
if sl.honorLabels {
|
||||
app = honorLabelsAppender{
|
||||
SampleAppender: app,
|
||||
labels: sl.targetLabels,
|
||||
}
|
||||
} else {
|
||||
app = ruleLabelsAppender{
|
||||
SampleAppender: app,
|
||||
labels: sl.targetLabels,
|
||||
}
|
||||
}
|
||||
return app, countingAppender
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) append(samples model.Samples) (int, error) {
|
||||
var (
|
||||
numOutOfOrder = 0
|
||||
numDuplicates = 0
|
||||
app = sl.appender
|
||||
countingApp *countingAppender
|
||||
)
|
||||
|
||||
if sl.sampleLimit > 0 {
|
||||
// We need to check for the sample limit, so append everything
|
||||
// to a wrapped bufferAppender first. Then point samples to the
|
||||
// result.
|
||||
bufApp := &bufferAppender{buffer: make(model.Samples, 0, len(samples))}
|
||||
var wrappedBufApp storage.SampleAppender
|
||||
wrappedBufApp, countingApp = sl.wrapAppender(bufApp)
|
||||
for _, s := range samples {
|
||||
// Ignore errors as bufferedAppender always succeds.
|
||||
wrappedBufApp.Append(s)
|
||||
}
|
||||
samples = bufApp.buffer
|
||||
if uint(countingApp.count) > sl.sampleLimit {
|
||||
targetScrapeSampleLimit.Inc()
|
||||
return countingApp.count, fmt.Errorf(
|
||||
"%d samples exceeded limit of %d", countingApp.count, sl.sampleLimit,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
// No need to check for sample limit. Wrap sl.appender directly.
|
||||
app, countingApp = sl.wrapAppender(sl.appender)
|
||||
}
|
||||
|
||||
for _, s := range samples {
|
||||
if err := sl.appender.Append(s); err != nil {
|
||||
if err := app.Append(s); err != nil {
|
||||
switch err {
|
||||
case local.ErrOutOfOrderSample:
|
||||
numOutOfOrder++
|
||||
|
@ -471,9 +519,10 @@ func (sl *scrapeLoop) append(samples model.Samples) {
|
|||
if numDuplicates > 0 {
|
||||
log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
|
||||
}
|
||||
return countingApp.count, nil
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples int, err error) {
|
||||
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples, postRelabelSamples int, err error) {
|
||||
sl.scraper.report(start, duration, err)
|
||||
|
||||
ts := model.TimeFromUnixNano(start.UnixNano())
|
||||
|
@ -504,14 +553,29 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam
|
|||
Timestamp: ts,
|
||||
Value: model.SampleValue(scrapedSamples),
|
||||
}
|
||||
postRelabelSample := &model.Sample{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: samplesPostRelabelMetricName,
|
||||
},
|
||||
Timestamp: ts,
|
||||
Value: model.SampleValue(postRelabelSamples),
|
||||
}
|
||||
|
||||
if err := sl.reportAppender.Append(healthSample); err != nil {
|
||||
reportAppender := ruleLabelsAppender{
|
||||
SampleAppender: sl.appender,
|
||||
labels: sl.targetLabels,
|
||||
}
|
||||
|
||||
if err := reportAppender.Append(healthSample); err != nil {
|
||||
log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded")
|
||||
}
|
||||
if err := sl.reportAppender.Append(durationSample); err != nil {
|
||||
if err := reportAppender.Append(durationSample); err != nil {
|
||||
log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded")
|
||||
}
|
||||
if err := sl.reportAppender.Append(countSample); err != nil {
|
||||
if err := reportAppender.Append(countSample); err != nil {
|
||||
log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded")
|
||||
}
|
||||
if err := reportAppender.Append(postRelabelSample); err != nil {
|
||||
log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabeling sample discarded")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) {
|
|||
}
|
||||
// On starting to run, new loops created on reload check whether their preceding
|
||||
// equivalents have been stopped.
|
||||
newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop {
|
||||
newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, tl model.LabelSet, cfg *config.ScrapeConfig) loop {
|
||||
l := &testLoop{}
|
||||
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
|
||||
if interval != 3*time.Second {
|
||||
|
@ -222,44 +222,19 @@ func TestScrapePoolReload(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestScrapePoolReportAppender(t *testing.T) {
|
||||
func TestScrapeLoopWrapSampleAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{
|
||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||
{}, {}, {},
|
||||
},
|
||||
}
|
||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||
app := &nopAppender{}
|
||||
|
||||
sp := newScrapePool(context.Background(), cfg, app)
|
||||
|
||||
cfg.HonorLabels = false
|
||||
wrapped := sp.reportAppender(target)
|
||||
|
||||
rl, ok := wrapped.(ruleLabelsAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||
}
|
||||
if rl.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", rl.SampleAppender)
|
||||
}
|
||||
|
||||
cfg.HonorLabels = true
|
||||
wrapped = sp.reportAppender(target)
|
||||
|
||||
hl, ok := wrapped.(ruleLabelsAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||
}
|
||||
if hl.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", hl.SampleAppender)
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrapePoolSampleAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{
|
||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||
{}, {}, {},
|
||||
{
|
||||
Action: config.RelabelDrop,
|
||||
SourceLabels: model.LabelNames{"__name__"},
|
||||
Regex: config.MustNewRegexp("does_not_match_.*"),
|
||||
},
|
||||
{
|
||||
Action: config.RelabelDrop,
|
||||
SourceLabels: model.LabelNames{"__name__"},
|
||||
Regex: config.MustNewRegexp("does_not_match_either_*"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -269,7 +244,15 @@ func TestScrapePoolSampleAppender(t *testing.T) {
|
|||
sp := newScrapePool(context.Background(), cfg, app)
|
||||
|
||||
cfg.HonorLabels = false
|
||||
wrapped := sp.sampleAppender(target)
|
||||
|
||||
sl := sp.newLoop(
|
||||
sp.ctx,
|
||||
&targetScraper{Target: target, client: sp.client},
|
||||
sp.appender,
|
||||
target.Labels(),
|
||||
sp.config,
|
||||
).(*scrapeLoop)
|
||||
wrapped, _ := sl.wrapAppender(sl.appender)
|
||||
|
||||
rl, ok := wrapped.(ruleLabelsAppender)
|
||||
if !ok {
|
||||
|
@ -279,12 +262,23 @@ func TestScrapePoolSampleAppender(t *testing.T) {
|
|||
if !ok {
|
||||
t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender)
|
||||
}
|
||||
if re.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
||||
co, ok := re.SampleAppender.(*countingAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender)
|
||||
}
|
||||
if co.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", co.SampleAppender)
|
||||
}
|
||||
|
||||
cfg.HonorLabels = true
|
||||
wrapped = sp.sampleAppender(target)
|
||||
sl = sp.newLoop(
|
||||
sp.ctx,
|
||||
&targetScraper{Target: target, client: sp.client},
|
||||
sp.appender,
|
||||
target.Labels(),
|
||||
sp.config,
|
||||
).(*scrapeLoop)
|
||||
wrapped, _ = sl.wrapAppender(sl.appender)
|
||||
|
||||
hl, ok := wrapped.(honorLabelsAppender)
|
||||
if !ok {
|
||||
|
@ -294,17 +288,176 @@ func TestScrapePoolSampleAppender(t *testing.T) {
|
|||
if !ok {
|
||||
t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender)
|
||||
}
|
||||
if re.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
||||
co, ok = re.SampleAppender.(*countingAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender)
|
||||
}
|
||||
if co.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", co.SampleAppender)
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrapeLoopSampleProcessing(t *testing.T) {
|
||||
readSamples := model.Samples{
|
||||
{
|
||||
Metric: model.Metric{"__name__": "a_metric"},
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "b_metric"},
|
||||
},
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
scrapedSamples model.Samples
|
||||
scrapeConfig *config.ScrapeConfig
|
||||
expectedReportedSamples model.Samples
|
||||
expectedPostRelabelSamplesCount int
|
||||
}{
|
||||
{ // 0
|
||||
scrapedSamples: readSamples,
|
||||
scrapeConfig: &config.ScrapeConfig{},
|
||||
expectedReportedSamples: model.Samples{
|
||||
{
|
||||
Metric: model.Metric{"__name__": "up"},
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_duration_seconds"},
|
||||
Value: 42,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_scraped"},
|
||||
Value: 2,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
expectedPostRelabelSamplesCount: 2,
|
||||
},
|
||||
{ // 1
|
||||
scrapedSamples: readSamples,
|
||||
scrapeConfig: &config.ScrapeConfig{
|
||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||
{
|
||||
Action: config.RelabelDrop,
|
||||
SourceLabels: model.LabelNames{"__name__"},
|
||||
Regex: config.MustNewRegexp("a.*"),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedReportedSamples: model.Samples{
|
||||
{
|
||||
Metric: model.Metric{"__name__": "up"},
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_duration_seconds"},
|
||||
Value: 42,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_scraped"},
|
||||
Value: 2,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
expectedPostRelabelSamplesCount: 1,
|
||||
},
|
||||
{ // 2
|
||||
scrapedSamples: readSamples,
|
||||
scrapeConfig: &config.ScrapeConfig{
|
||||
SampleLimit: 1,
|
||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||
{
|
||||
Action: config.RelabelDrop,
|
||||
SourceLabels: model.LabelNames{"__name__"},
|
||||
Regex: config.MustNewRegexp("a.*"),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedReportedSamples: model.Samples{
|
||||
{
|
||||
Metric: model.Metric{"__name__": "up"},
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_duration_seconds"},
|
||||
Value: 42,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_scraped"},
|
||||
Value: 2,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
expectedPostRelabelSamplesCount: 1,
|
||||
},
|
||||
{ // 3
|
||||
scrapedSamples: readSamples,
|
||||
scrapeConfig: &config.ScrapeConfig{
|
||||
SampleLimit: 1,
|
||||
},
|
||||
expectedReportedSamples: model.Samples{
|
||||
{
|
||||
Metric: model.Metric{"__name__": "up"},
|
||||
Value: 0,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_duration_seconds"},
|
||||
Value: 42,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_scraped"},
|
||||
Value: 2,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
expectedPostRelabelSamplesCount: 2,
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range testCases {
|
||||
ingestedSamples := &bufferAppender{buffer: model.Samples{}}
|
||||
|
||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||
|
||||
scraper := &testScraper{}
|
||||
sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, target.Labels(), test.scrapeConfig).(*scrapeLoop)
|
||||
num, err := sl.append(test.scrapedSamples)
|
||||
sl.report(time.Unix(0, 0), 42*time.Second, len(test.scrapedSamples), num, err)
|
||||
reportedSamples := ingestedSamples.buffer
|
||||
if err == nil {
|
||||
reportedSamples = reportedSamples[num:]
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(reportedSamples, test.expectedReportedSamples) {
|
||||
t.Errorf("Reported samples did not match expected metrics for case %d", i)
|
||||
t.Errorf("Expected: %v", test.expectedReportedSamples)
|
||||
t.Fatalf("Got: %v", reportedSamples)
|
||||
}
|
||||
if test.expectedPostRelabelSamplesCount != num {
|
||||
t.Fatalf("Case %d: Ingested samples %d did not match expected value %d", i, num, test.expectedPostRelabelSamplesCount)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestScrapeLoopStop(t *testing.T) {
|
||||
scraper := &testScraper{}
|
||||
sl := newScrapeLoop(context.Background(), scraper, nil, nil)
|
||||
sl := newScrapeLoop(context.Background(), scraper, nil, nil, &config.ScrapeConfig{})
|
||||
|
||||
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
|
||||
// loops are syarted asynchronously. Thus it's possible, that a loop is stopped
|
||||
// loops are started asynchronously. Thus it's possible, that a loop is stopped
|
||||
// again before having started properly.
|
||||
// Stopping not-yet-started loops must block until the run method was called and exited.
|
||||
// The run method must exit immediately.
|
||||
|
@ -351,14 +504,13 @@ func TestScrapeLoopRun(t *testing.T) {
|
|||
signal = make(chan struct{})
|
||||
errc = make(chan error)
|
||||
|
||||
scraper = &testScraper{}
|
||||
app = &nopAppender{}
|
||||
reportApp = &nopAppender{}
|
||||
scraper = &testScraper{}
|
||||
app = &nopAppender{}
|
||||
)
|
||||
defer close(signal)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sl := newScrapeLoop(ctx, scraper, app, reportApp)
|
||||
sl := newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{})
|
||||
|
||||
// The loop must terminate during the initial offset if the context
|
||||
// is canceled.
|
||||
|
@ -396,7 +548,7 @@ func TestScrapeLoopRun(t *testing.T) {
|
|||
}
|
||||
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
sl = newScrapeLoop(ctx, scraper, app, reportApp)
|
||||
sl = newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{})
|
||||
|
||||
go func() {
|
||||
sl.run(time.Second, 100*time.Millisecond, errc)
|
||||
|
|
|
@ -278,6 +278,29 @@ func (app relabelAppender) Append(s *model.Sample) error {
|
|||
return app.SampleAppender.Append(s)
|
||||
}
|
||||
|
||||
// bufferAppender appends samples to the given buffer.
|
||||
type bufferAppender struct {
|
||||
buffer model.Samples
|
||||
}
|
||||
|
||||
func (app *bufferAppender) Append(s *model.Sample) error {
|
||||
app.buffer = append(app.buffer, s)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *bufferAppender) NeedsThrottling() bool { return false }
|
||||
|
||||
// countingAppender counts the samples appended to the underlying appender.
|
||||
type countingAppender struct {
|
||||
storage.SampleAppender
|
||||
count int
|
||||
}
|
||||
|
||||
func (app *countingAppender) Append(s *model.Sample) error {
|
||||
app.count++
|
||||
return app.SampleAppender.Append(s)
|
||||
}
|
||||
|
||||
// populateLabels builds a label set from the given label set and scrape configuration.
|
||||
// It returns a label set before relabeling was applied as the second return value.
|
||||
// Returns a nil label set if the target is dropped during relabeling.
|
||||
|
|
Loading…
Reference in a new issue