diff --git a/retrieval/scrape.go b/retrieval/scrape.go index da6fa5d8a..bbf1e113c 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -135,7 +135,7 @@ func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { newTargets[fp] = tnew tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, tnew.wrapAppender(sp.appender), tnew.wrapReportingAppender(sp.appender)) - go tnew.scrapeLoop.run(tnew.interval(), tnew.timeout()) + go tnew.scrapeLoop.run(tnew.interval(), tnew.timeout(), nil) } } for fp, told := range prevTargets { @@ -179,7 +179,7 @@ func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { } type loop interface { - run(interval, timeout time.Duration) + run(interval, timeout time.Duration, errc chan<- error) stop() } @@ -207,7 +207,7 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.Sampl return sl } -func (sl *scrapeLoop) run(interval, timeout time.Duration) { +func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { defer close(sl.done) select { @@ -229,33 +229,34 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration) { default: } - if sl.appender.NeedsThrottling() { + if !sl.appender.NeedsThrottling() { + var ( + start = time.Now() + scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) + ) + + targetIntervalLength.WithLabelValues(interval.String()).Observe( + float64(time.Since(last)) / float64(time.Second), // Sub-second precision. + ) + + samples, err := sl.scraper.scrape(scrapeCtx) + if err == nil { + sl.append(samples) + } else if errc != nil { + errc <- err + } + + sl.report(start, time.Since(start), err) + last = start + } else { targetSkippedScrapes.WithLabelValues(interval.String()).Inc() - continue } - targetIntervalLength.WithLabelValues(interval.String()).Observe( - float64(time.Since(last)) / float64(time.Second), // Sub-second precision. - ) - - var ( - start = time.Now() - scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) - ) - - samples, err := sl.scraper.scrape(scrapeCtx) - if err == nil { - sl.append(samples) - } - - sl.report(start, time.Since(start), err) select { case <-sl.ctx.Done(): return case <-ticker.C: } - - last = start } } diff --git a/retrieval/target_test.go b/retrieval/target_test.go index be8075c05..2627d97f0 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" ) @@ -167,299 +168,6 @@ func TestTargetWrapAppender(t *testing.T) { } } -func TestOverwriteLabels(t *testing.T) { - type test struct { - metric string - resultNormal model.Metric - resultHonor model.Metric - } - var tests []test - - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - for _, test := range tests { - w.Write([]byte(test.metric)) - w.Write([]byte(" 1\n")) - } - }, - ), - ) - defer server.Close() - addr := model.LabelValue(strings.Split(server.URL, "://")[1]) - - tests = []test{ - { - metric: `foo{}`, - resultNormal: model.Metric{ - model.MetricNameLabel: "foo", - model.InstanceLabel: addr, - }, - resultHonor: model.Metric{ - model.MetricNameLabel: "foo", - model.InstanceLabel: addr, - }, - }, - { - metric: `foo{instance=""}`, - resultNormal: model.Metric{ - model.MetricNameLabel: "foo", - model.InstanceLabel: addr, - }, - resultHonor: model.Metric{ - model.MetricNameLabel: "foo", - }, - }, - { - metric: `foo{instance="other_instance"}`, - resultNormal: model.Metric{ - model.MetricNameLabel: "foo", - model.InstanceLabel: addr, - model.ExportedLabelPrefix + model.InstanceLabel: "other_instance", - }, - resultHonor: model.Metric{ - model.MetricNameLabel: "foo", - model.InstanceLabel: "other_instance", - }, - }, - } - - target := newTestTarget(server.URL, time.Second, nil) - - target.scrapeConfig.HonorLabels = false - app := &collectResultAppender{} - if err := target.scrape(app); err != nil { - t.Fatal(err) - } - - for i, test := range tests { - if !reflect.DeepEqual(app.result[i].Metric, test.resultNormal) { - t.Errorf("Error comparing %q:\nExpected:\n%s\nGot:\n%s\n", test.metric, test.resultNormal, app.result[i].Metric) - } - } - - target.scrapeConfig.HonorLabels = true - app = &collectResultAppender{} - if err := target.scrape(app); err != nil { - t.Fatal(err) - } - - for i, test := range tests { - if !reflect.DeepEqual(app.result[i].Metric, test.resultHonor) { - t.Errorf("Error comparing %q:\nExpected:\n%s\nGot:\n%s\n", test.metric, test.resultHonor, app.result[i].Metric) - } - - } -} -func TestTargetScrapeUpdatesState(t *testing.T) { - testTarget := newTestTarget("bad schema", 0, nil) - - testTarget.scrape(nopAppender{}) - if testTarget.status.Health() != HealthBad { - t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health()) - } -} - -func TestTargetScrapeWithThrottledStorage(t *testing.T) { - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - for i := 0; i < 10; i++ { - w.Write([]byte( - fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i), - )) - } - }, - ), - ) - defer server.Close() - - testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"}) - - go testTarget.RunScraper(&collectResultAppender{throttled: true}) - - // Enough time for a scrape to happen. - time.Sleep(20 * time.Millisecond) - - testTarget.StopScraper() - // Wait for it to take effect. - time.Sleep(20 * time.Millisecond) - - if testTarget.status.Health() != HealthBad { - t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health()) - } - if testTarget.status.LastError() != errSkippedScrape { - t.Errorf("Expected target error %q, actual: %q", errSkippedScrape, testTarget.status.LastError()) - } -} - -func TestTargetScrapeMetricRelabelConfigs(t *testing.T) { - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - w.Write([]byte("test_metric_drop 0\n")) - w.Write([]byte("test_metric_relabel 1\n")) - }, - ), - ) - defer server.Close() - testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{}) - testTarget.scrapeConfig.MetricRelabelConfigs = []*config.RelabelConfig{ - { - SourceLabels: model.LabelNames{"__name__"}, - Regex: config.MustNewRegexp(".*drop.*"), - Action: config.RelabelDrop, - }, - { - SourceLabels: model.LabelNames{"__name__"}, - Regex: config.MustNewRegexp(".*(relabel|up).*"), - TargetLabel: "foo", - Replacement: "bar", - Action: config.RelabelReplace, - }, - } - - appender := &collectResultAppender{} - if err := testTarget.scrape(appender); err != nil { - t.Fatal(err) - } - - // Remove variables part of result. - for _, sample := range appender.result { - sample.Timestamp = 0 - sample.Value = 0 - } - - expected := []*model.Sample{ - { - Metric: model.Metric{ - model.MetricNameLabel: "test_metric_relabel", - "foo": "bar", - model.InstanceLabel: model.LabelValue(testTarget.host()), - }, - Timestamp: 0, - Value: 0, - }, - // The metrics about the scrape are not affected. - { - Metric: model.Metric{ - model.MetricNameLabel: scrapeHealthMetricName, - model.InstanceLabel: model.LabelValue(testTarget.host()), - }, - Timestamp: 0, - Value: 0, - }, - { - Metric: model.Metric{ - model.MetricNameLabel: scrapeDurationMetricName, - model.InstanceLabel: model.LabelValue(testTarget.host()), - }, - Timestamp: 0, - Value: 0, - }, - } - - if !appender.result.Equal(expected) { - t.Fatalf("Expected and actual samples not equal. Expected: %s, actual: %s", expected, appender.result) - } - -} - -func TestTargetRecordScrapeHealth(t *testing.T) { - var ( - testTarget = newTestTarget("example.url:80", 0, model.LabelSet{model.JobLabel: "testjob"}) - now = model.Now() - appender = &collectResultAppender{} - ) - - testTarget.report(appender, now.Time(), 2*time.Second, nil) - - result := appender.result - - if len(result) != 2 { - t.Fatalf("Expected two samples, got %d", len(result)) - } - - actual := result[0] - expected := &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeHealthMetricName, - model.InstanceLabel: "example.url:80", - model.JobLabel: "testjob", - }, - Timestamp: now, - Value: 1, - } - - if !actual.Equal(expected) { - t.Fatalf("Expected and actual samples not equal. Expected: %v, actual: %v", expected, actual) - } - - actual = result[1] - expected = &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeDurationMetricName, - model.InstanceLabel: "example.url:80", - model.JobLabel: "testjob", - }, - Timestamp: now, - Value: 2.0, - } - - if !actual.Equal(expected) { - t.Fatalf("Expected and actual samples not equal. Expected: %v, actual: %v", expected, actual) - } -} - -func TestTargetScrapeTimeout(t *testing.T) { - signal := make(chan bool, 1) - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - <-signal - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - w.Write([]byte{}) - }, - ), - ) - defer server.Close() - - testTarget := newTestTarget(server.URL, 50*time.Millisecond, model.LabelSet{}) - - appender := nopAppender{} - - // scrape once without timeout - signal <- true - if err := testTarget.scrape(appender); err != nil { - t.Fatal(err) - } - - // let the deadline lapse - time.Sleep(55 * time.Millisecond) - - // now scrape again - signal <- true - if err := testTarget.scrape(appender); err != nil { - t.Fatal(err) - } - - // now timeout - if err := testTarget.scrape(appender); err == nil { - t.Fatal("expected scrape to timeout") - } else { - signal <- true // let handler continue - } - - // now scrape again without timeout - signal <- true - if err := testTarget.scrape(appender); err != nil { - t.Fatal(err) - } -} - func TestTargetScrape404(t *testing.T) { server := httptest.NewServer( http.HandlerFunc( @@ -471,59 +179,14 @@ func TestTargetScrape404(t *testing.T) { defer server.Close() testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{}) - appender := nopAppender{} want := errors.New("server returned HTTP status 404 Not Found") - got := testTarget.scrape(appender) + _, got := testTarget.scrape(context.Background()) if got == nil || want.Error() != got.Error() { t.Fatalf("want err %q, got %q", want, got) } } -func TestTargetRunScraperScrapes(t *testing.T) { - testTarget := newTestTarget("bad schema", 0, nil) - - go testTarget.RunScraper(nopAppender{}) - - // Enough time for a scrape to happen. - time.Sleep(20 * time.Millisecond) - if testTarget.status.LastScrape().IsZero() { - t.Errorf("Scrape hasn't occured.") - } - - testTarget.StopScraper() - // Wait for it to take effect. - time.Sleep(20 * time.Millisecond) - last := testTarget.status.LastScrape() - // Enough time for a scrape to happen. - time.Sleep(20 * time.Millisecond) - if testTarget.status.LastScrape() != last { - t.Errorf("Scrape occured after it was stopped.") - } -} - -func BenchmarkScrape(b *testing.B) { - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n")) - }, - ), - ) - defer server.Close() - - testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"}) - appender := nopAppender{} - - b.ResetTimer() - for i := 0; i < b.N; i++ { - if err := testTarget.scrape(appender); err != nil { - b.Fatal(err) - } - } -} - func TestURLParams(t *testing.T) { server := httptest.NewServer( http.HandlerFunc( @@ -566,8 +229,7 @@ func TestURLParams(t *testing.T) { if err != nil { t.Fatal(err) } - app := &collectResultAppender{} - if err = target.scrape(app); err != nil { + if _, err = target.scrape(context.Background()); err != nil { t.Fatal(err) } } @@ -583,10 +245,8 @@ func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelS ScrapeInterval: model.Duration(time.Millisecond), ScrapeTimeout: model.Duration(deadline), }, - labels: labels, - status: &TargetStatus{}, - scraperStopping: make(chan struct{}), - scraperStopped: make(chan struct{}), + labels: labels, + status: &TargetStatus{}, } }