diff --git a/scrape/scrape.go b/scrape/scrape.go index e6e55900b..866abdb5f 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -305,7 +305,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc) }, func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) }, - func(ctx context.Context) storage.Appender { return appender(app.Appender(ctx), opts.sampleLimit) }, + func(ctx context.Context) storage.Appender { return app.Appender(ctx) }, cache, jitterSeed, opts.honorTimestamps, @@ -1426,6 +1426,9 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, e exemplar.Exemplar // escapes to heap so hoisted out of loop ) + // Take an appender with limits. + app = appender(app, sl.sampleLimit) + defer func() { if err != nil { return diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 5ab573112..7eba2eb40 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -463,7 +463,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok := loop.(*scrapeLoop) require.True(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped := appl.appender(context.Background()) + wrapped := appender(appl.appender(context.Background()), 0) tl, ok := wrapped.(*timeLimitAppender) require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped) @@ -471,20 +471,21 @@ func TestScrapePoolAppender(t *testing.T) { _, ok = tl.Appender.(nopAppender) require.True(t, ok, "Expected base appender but got %T", tl.Appender) + sampleLimit := 100 loop = sp.newLoop(scrapeLoopOptions{ target: &Target{}, - sampleLimit: 100, + sampleLimit: sampleLimit, }) appl, ok = loop.(*scrapeLoop) require.True(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped = appl.appender(context.Background()) + wrapped = appender(appl.appender(context.Background()), sampleLimit) sl, ok := wrapped.(*limitAppender) require.True(t, ok, "Expected limitAppender but got %T", wrapped) tl, ok = sl.Appender.(*timeLimitAppender) - require.True(t, ok, "Expected limitAppender but got %T", sl.Appender) + require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender) _, ok = tl.Appender.(nopAppender) require.True(t, ok, "Expected base appender but got %T", tl.Appender) @@ -2772,6 +2773,72 @@ func TestScrapeReportSingleAppender(t *testing.T) { } } +func TestScrapeReportLimit(t *testing.T) { + s := teststorage.New(t) + defer s.Close() + + cfg := &config.ScrapeConfig{ + JobName: "test", + SampleLimit: 5, + Scheme: "http", + ScrapeInterval: model.Duration(100 * time.Millisecond), + ScrapeTimeout: model.Duration(100 * time.Millisecond), + } + + var ( + scrapes int + scrapedTwice = make(chan bool) + ) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n") + scrapes++ + if scrapes == 2 { + close(scrapedTwice) + } + })) + defer ts.Close() + + sp, err := newScrapePool(cfg, s, 0, nil, false) + require.NoError(t, err) + defer sp.stop() + + testURL, err := url.Parse(ts.URL) + require.NoError(t, err) + sp.Sync([]*targetgroup.Group{ + { + Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(testURL.Host)}}, + }, + }) + + select { + case <-time.After(5 * time.Second): + t.Fatalf("target was not scraped twice") + case <-scrapedTwice: + // If the target has been scraped twice, report samples from the first + // scrape have been inserted in the database. + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q, err := s.Querier(ctx, time.Time{}.UnixNano(), time.Now().UnixNano()) + require.NoError(t, err) + defer q.Close() + series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "up")) + + var found bool + for series.Next() { + i := series.At().Iterator() + for i.Next() { + _, v := i.At() + require.Equal(t, 1.0, v) + found = true + } + } + + require.True(t, found) +} + func TestScrapeLoopLabelLimit(t *testing.T) { tests := []struct { title string