diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 3a7dbee00..c67d1b981 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -110,7 +110,7 @@ type scrapePool struct { loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. - newLoop func(context.Context, scraper, storage.SampleAppender, func(storage.SampleAppender) storage.SampleAppender, storage.SampleAppender, uint) loop + newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop } func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { @@ -179,7 +179,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { var ( t = sp.targets[fp] s = &targetScraper{Target: t, client: sp.client} - newLoop = sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t), sp.config.SampleLimit) + newLoop = sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config) ) wg.Add(1) @@ -240,7 +240,7 @@ func (sp *scrapePool) sync(targets []*Target) { if _, ok := sp.targets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client} - l := sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t), sp.config.SampleLimit) + l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config) sp.targets[hash] = t sp.loops[hash] = l @@ -272,41 +272,6 @@ func (sp *scrapePool) sync(targets []*Target) { wg.Wait() } -// sampleMutator returns a function that'll take an appender and return an appender for mutated samples. -func (sp *scrapePool) sampleMutator(target *Target) func(storage.SampleAppender) storage.SampleAppender { - return func(app storage.SampleAppender) storage.SampleAppender { - // The relabelAppender has to be inside the label-modifying appenders - // so the relabeling rules are applied to the correct label set. - if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { - app = relabelAppender{ - SampleAppender: app, - relabelings: mrc, - } - } - - if sp.config.HonorLabels { - app = honorLabelsAppender{ - SampleAppender: app, - labels: target.Labels(), - } - } else { - app = ruleLabelsAppender{ - SampleAppender: app, - labels: target.Labels(), - } - } - return app - } -} - -// reportAppender returns an appender for reporting samples for the target. -func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { - return ruleLabelsAppender{ - SampleAppender: sp.appender, - labels: target.Labels(), - } -} - // A scraper retrieves samples and accepts a status report at the end. type scraper interface { scrape(ctx context.Context, ts time.Time) (model.Samples, error) @@ -376,26 +341,32 @@ type scrapeLoop struct { // Where samples are ultimately sent. appender storage.SampleAppender - // Applies relabel rules and label handling. - mutator func(storage.SampleAppender) storage.SampleAppender - // For sending up and scrape_*. - reportAppender storage.SampleAppender - // Limit on number of samples that will be accepted. - sampleLimit uint + + targetLabels model.LabelSet + metricRelabelConfigs []*config.RelabelConfig + honorLabels bool + sampleLimit uint done chan struct{} ctx context.Context cancel func() } -func newScrapeLoop(ctx context.Context, sc scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender, sampleLimit uint) loop { +func newScrapeLoop( + ctx context.Context, + sc scraper, + appender storage.SampleAppender, + targetLabels model.LabelSet, + config *config.ScrapeConfig, +) loop { sl := &scrapeLoop{ - scraper: sc, - appender: app, - mutator: mut, - reportAppender: reportApp, - sampleLimit: sampleLimit, - done: make(chan struct{}), + scraper: sc, + appender: appender, + targetLabels: targetLabels, + metricRelabelConfigs: config.MetricRelabelConfigs, + honorLabels: config.HonorLabels, + sampleLimit: config.SampleLimit, + done: make(chan struct{}), } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -426,8 +397,9 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { if !sl.appender.NeedsThrottling() { var ( - start = time.Now() - scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) + start = time.Now() + scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) + numPostRelabelSamples = 0 ) // Only record after the first scrape. @@ -438,11 +410,13 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { } samples, err := sl.scraper.scrape(scrapeCtx, start) - err = sl.processScrapeResult(samples, err, start) + if err == nil { + numPostRelabelSamples, err = sl.append(samples) + } if err != nil && errc != nil { errc <- err } - + sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err) last = start } else { targetSkippedScrapes.WithLabelValues(interval.String()).Inc() @@ -461,36 +435,73 @@ func (sl *scrapeLoop) stop() { <-sl.done } -func (sl *scrapeLoop) processScrapeResult(samples model.Samples, scrapeErr error, start time.Time) error { - // Collect samples post-relabelling and label handling in a buffer. - buf := &bufferAppender{buffer: make(model.Samples, 0, len(samples))} - if scrapeErr == nil { - app := sl.mutator(buf) - for _, sample := range samples { - app.Append(sample) - } +// wrapAppender wraps a SampleAppender for relabeling. It returns the wrappend +// appender and an innermost countingAppender that counts the samples actually +// appended in the end. +func (sl *scrapeLoop) wrapAppender(app storage.SampleAppender) (storage.SampleAppender, *countingAppender) { + // Innermost appender is a countingAppender to count how many samples + // are left in the end. + countingAppender := &countingAppender{ + SampleAppender: app, + } + app = countingAppender - if sl.sampleLimit > 0 && uint(len(buf.buffer)) > sl.sampleLimit { - scrapeErr = fmt.Errorf("%d samples exceeded limit of %d", len(buf.buffer), sl.sampleLimit) - targetScrapeSampleLimit.Inc() - } else { - // Send samples to storage. - sl.append(buf.buffer) + // The relabelAppender has to be inside the label-modifying appenders so + // the relabeling rules are applied to the correct label set. + if len(sl.metricRelabelConfigs) > 0 { + app = relabelAppender{ + SampleAppender: app, + relabelings: sl.metricRelabelConfigs, } } - sl.report(start, time.Since(start), len(samples), len(buf.buffer), scrapeErr) - return scrapeErr + if sl.honorLabels { + app = honorLabelsAppender{ + SampleAppender: app, + labels: sl.targetLabels, + } + } else { + app = ruleLabelsAppender{ + SampleAppender: app, + labels: sl.targetLabels, + } + } + return app, countingAppender } -func (sl *scrapeLoop) append(samples model.Samples) { +func (sl *scrapeLoop) append(samples model.Samples) (int, error) { var ( numOutOfOrder = 0 numDuplicates = 0 + app = sl.appender + countingApp *countingAppender ) + if sl.sampleLimit > 0 { + // We need to check for the sample limit, so append everything + // to a wrapped bufferAppender first. Then point samples to the + // result. + bufApp := &bufferAppender{buffer: make(model.Samples, 0, len(samples))} + var wrappedBufApp storage.SampleAppender + wrappedBufApp, countingApp = sl.wrapAppender(bufApp) + for _, s := range samples { + // Ignore errors as bufferedAppender always succeds. + wrappedBufApp.Append(s) + } + samples = bufApp.buffer + if uint(countingApp.count) > sl.sampleLimit { + targetScrapeSampleLimit.Inc() + return countingApp.count, fmt.Errorf( + "%d samples exceeded limit of %d", countingApp.count, sl.sampleLimit, + ) + } + } else { + // No need to check for sample limit. Wrap sl.appender directly. + app, countingApp = sl.wrapAppender(sl.appender) + } + for _, s := range samples { - if err := sl.appender.Append(s); err != nil { + if err := app.Append(s); err != nil { switch err { case local.ErrOutOfOrderSample: numOutOfOrder++ @@ -509,6 +520,7 @@ func (sl *scrapeLoop) append(samples model.Samples) { if numDuplicates > 0 { log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") } + return countingApp.count, nil } func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples, postRelabelSamples int, err error) { @@ -550,16 +562,21 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam Value: model.SampleValue(postRelabelSamples), } - if err := sl.reportAppender.Append(healthSample); err != nil { + reportAppender := ruleLabelsAppender{ + SampleAppender: sl.appender, + labels: sl.targetLabels, + } + + if err := reportAppender.Append(healthSample); err != nil { log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded") } - if err := sl.reportAppender.Append(durationSample); err != nil { + if err := reportAppender.Append(durationSample); err != nil { log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded") } - if err := sl.reportAppender.Append(countSample); err != nil { + if err := reportAppender.Append(countSample); err != nil { log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded") } - if err := sl.reportAppender.Append(postRelabelSample); err != nil { - log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabelling sample discarded") + if err := reportAppender.Append(postRelabelSample); err != nil { + log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabeling sample discarded") } } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 55c95c402..b2feb5168 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) { } // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. - newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender, sampleLimit uint) loop { + newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, tl model.LabelSet, cfg *config.ScrapeConfig) loop { l := &testLoop{} l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { if interval != 3*time.Second { @@ -222,44 +222,19 @@ func TestScrapePoolReload(t *testing.T) { } } -func TestScrapePoolReportAppender(t *testing.T) { +func TestScrapeLoopWrapSampleAppender(t *testing.T) { cfg := &config.ScrapeConfig{ MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - app := &nopAppender{} - - sp := newScrapePool(context.Background(), cfg, app) - - cfg.HonorLabels = false - wrapped := sp.reportAppender(target) - - rl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if rl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", rl.SampleAppender) - } - - cfg.HonorLabels = true - wrapped = sp.reportAppender(target) - - hl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if hl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", hl.SampleAppender) - } -} - -func TestScrapePoolSampleAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, + { + Action: config.RelabelDrop, + SourceLabels: model.LabelNames{"__name__"}, + Regex: config.MustNewRegexp("does_not_match_.*"), + }, + { + Action: config.RelabelDrop, + SourceLabels: model.LabelNames{"__name__"}, + Regex: config.MustNewRegexp("does_not_match_either_*"), + }, }, } @@ -269,7 +244,20 @@ func TestScrapePoolSampleAppender(t *testing.T) { sp := newScrapePool(context.Background(), cfg, app) cfg.HonorLabels = false - wrapped := sp.sampleMutator(target)(app) + + sl := sp.newLoop( + sp.ctx, + &targetScraper{Target: target, client: sp.client}, + sp.appender, + target.Labels(), + sp.config, + ).(*scrapeLoop) + wrapped, counting := sl.wrapAppender(sl.appender) + wrapped.Append(&model.Sample{}) + + if counting.count != 1 { + t.Errorf("Expected count of 1, got %d", counting.count) + } rl, ok := wrapped.(ruleLabelsAppender) if !ok { @@ -279,12 +267,28 @@ func TestScrapePoolSampleAppender(t *testing.T) { if !ok { t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) + co, ok := re.SampleAppender.(*countingAppender) + if !ok { + t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender) + } + if co.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", co.SampleAppender) } cfg.HonorLabels = true - wrapped = sp.sampleMutator(target)(app) + sl = sp.newLoop( + sp.ctx, + &targetScraper{Target: target, client: sp.client}, + sp.appender, + target.Labels(), + sp.config, + ).(*scrapeLoop) + wrapped, counting = sl.wrapAppender(sl.appender) + wrapped.Append(&model.Sample{}) + + if counting.count != 1 { + t.Errorf("Expected count of 1, got %d", counting.count) + } hl, ok := wrapped.(honorLabelsAppender) if !ok { @@ -294,8 +298,12 @@ func TestScrapePoolSampleAppender(t *testing.T) { if !ok { t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) + co, ok = re.SampleAppender.(*countingAppender) + if !ok { + t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender) + } + if co.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", co.SampleAppender) } } @@ -310,15 +318,14 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { } testCases := []struct { - scrapedSamples model.Samples - scrapeError error - scrapeConfig config.ScrapeConfig - expectedReportedSamples model.Samples - expectedIngestedSamplesCount int + scrapedSamples model.Samples + scrapeConfig *config.ScrapeConfig + expectedReportedSamples model.Samples + expectedPostRelabelSamplesCount int }{ - { + { // 0 scrapedSamples: readSamples, - scrapeError: nil, + scrapeConfig: &config.ScrapeConfig{}, expectedReportedSamples: model.Samples{ { Metric: model.Metric{"__name__": "up"}, @@ -326,6 +333,7 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { }, { Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + Value: 42, }, { Metric: model.Metric{"__name__": "scrape_samples_scraped"}, @@ -336,12 +344,11 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { Value: 2, }, }, - expectedIngestedSamplesCount: 2, + expectedPostRelabelSamplesCount: 2, }, - { + { // 1 scrapedSamples: readSamples, - scrapeError: nil, - scrapeConfig: config.ScrapeConfig{ + scrapeConfig: &config.ScrapeConfig{ MetricRelabelConfigs: []*config.RelabelConfig{ { Action: config.RelabelDrop, @@ -357,6 +364,7 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { }, { Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + Value: 42, }, { Metric: model.Metric{"__name__": "scrape_samples_scraped"}, @@ -367,12 +375,11 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { Value: 1, }, }, - expectedIngestedSamplesCount: 1, + expectedPostRelabelSamplesCount: 1, }, - { + { // 2 scrapedSamples: readSamples, - scrapeError: nil, - scrapeConfig: config.ScrapeConfig{ + scrapeConfig: &config.ScrapeConfig{ SampleLimit: 1, MetricRelabelConfigs: []*config.RelabelConfig{ { @@ -389,6 +396,7 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { }, { Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + Value: 42, }, { Metric: model.Metric{"__name__": "scrape_samples_scraped"}, @@ -399,12 +407,11 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { Value: 1, }, }, - expectedIngestedSamplesCount: 1, + expectedPostRelabelSamplesCount: 1, }, - { + { // 3 scrapedSamples: readSamples, - scrapeError: nil, - scrapeConfig: config.ScrapeConfig{ + scrapeConfig: &config.ScrapeConfig{ SampleLimit: 1, }, expectedReportedSamples: model.Samples{ @@ -414,6 +421,7 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { }, { Metric: model.Metric{"__name__": "scrape_duration_seconds"}, + Value: 42, }, { Metric: model.Metric{"__name__": "scrape_samples_scraped"}, @@ -424,53 +432,31 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { Value: 2, }, }, - expectedIngestedSamplesCount: 0, - }, - { - scrapedSamples: model.Samples{}, - scrapeError: fmt.Errorf("error"), - expectedReportedSamples: model.Samples{ - { - Metric: model.Metric{"__name__": "up"}, - Value: 0, - }, - { - Metric: model.Metric{"__name__": "scrape_duration_seconds"}, - }, - { - Metric: model.Metric{"__name__": "scrape_samples_scraped"}, - Value: 0, - }, - { - Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, - Value: 0, - }, - }, - expectedIngestedSamplesCount: 0, + expectedPostRelabelSamplesCount: 2, }, } for i, test := range testCases { ingestedSamples := &bufferAppender{buffer: model.Samples{}} - reportedSamples := &bufferAppender{buffer: model.Samples{}} target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - sp := newScrapePool(context.Background(), &test.scrapeConfig, ingestedSamples) scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, sp.sampleMutator(target), reportedSamples, test.scrapeConfig.SampleLimit).(*scrapeLoop) - sl.processScrapeResult(test.scrapedSamples, test.scrapeError, time.Unix(0, 0)) + sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, target.Labels(), test.scrapeConfig).(*scrapeLoop) + num, err := sl.append(test.scrapedSamples) + sl.report(time.Unix(0, 0), 42*time.Second, len(test.scrapedSamples), num, err) + reportedSamples := ingestedSamples.buffer + if err == nil { + reportedSamples = reportedSamples[num:] + } - // Ignore value of scrape_duration_seconds, as it's time dependant. - reportedSamples.buffer[1].Value = 0 - - if !reflect.DeepEqual(reportedSamples.buffer, test.expectedReportedSamples) { + if !reflect.DeepEqual(reportedSamples, test.expectedReportedSamples) { t.Errorf("Reported samples did not match expected metrics for case %d", i) t.Errorf("Expected: %v", test.expectedReportedSamples) - t.Fatalf("Got: %v", reportedSamples.buffer) + t.Fatalf("Got: %v", reportedSamples) } - if test.expectedIngestedSamplesCount != len(ingestedSamples.buffer) { - t.Fatalf("Ingested samples %d did not match expected value %d", len(ingestedSamples.buffer), test.expectedIngestedSamplesCount) + if test.expectedPostRelabelSamplesCount != num { + t.Fatalf("Case %d: Ingested samples %d did not match expected value %d", i, num, test.expectedPostRelabelSamplesCount) } } @@ -478,10 +464,10 @@ func TestScrapeLoopSampleProcessing(t *testing.T) { func TestScrapeLoopStop(t *testing.T) { scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil, 0) + sl := newScrapeLoop(context.Background(), scraper, nil, nil, &config.ScrapeConfig{}) // The scrape pool synchronizes on stopping scrape loops. However, new scrape - // loops are syarted asynchronously. Thus it's possible, that a loop is stopped + // loops are started asynchronously. Thus it's possible, that a loop is stopped // again before having started properly. // Stopping not-yet-started loops must block until the run method was called and exited. // The run method must exit immediately. @@ -528,15 +514,13 @@ func TestScrapeLoopRun(t *testing.T) { signal = make(chan struct{}) errc = make(chan error) - scraper = &testScraper{} - app = &nopAppender{} - mut = func(storage.SampleAppender) storage.SampleAppender { return &nopAppender{} } - reportApp = &nopAppender{} + scraper = &testScraper{} + app = &nopAppender{} ) defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, mut, reportApp, 0) + sl := newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{}) // The loop must terminate during the initial offset if the context // is canceled. @@ -574,7 +558,7 @@ func TestScrapeLoopRun(t *testing.T) { } ctx, cancel = context.WithCancel(context.Background()) - sl = newScrapeLoop(ctx, scraper, app, mut, reportApp, 0) + sl = newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{}) go func() { sl.run(time.Second, 100*time.Millisecond, errc) diff --git a/retrieval/target.go b/retrieval/target.go index 4a0b94bdf..b599a2a2f 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -278,9 +278,8 @@ func (app relabelAppender) Append(s *model.Sample) error { return app.SampleAppender.Append(s) } -// Appends samples to the given buffer. +// bufferAppender appends samples to the given buffer. type bufferAppender struct { - storage.SampleAppender buffer model.Samples } @@ -289,6 +288,19 @@ func (app *bufferAppender) Append(s *model.Sample) error { return nil } +func (app *bufferAppender) NeedsThrottling() bool { return false } + +// countingAppender counts the samples appended to the underlying appender. +type countingAppender struct { + storage.SampleAppender + count int +} + +func (app *countingAppender) Append(s *model.Sample) error { + app.count++ + return app.SampleAppender.Append(s) +} + // populateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns a nil label set if the target is dropped during relabeling.