Add scrape_series_added per-scrape metric. (#5546)

This is an estimate of churn, with series being added to the cache being
considered churn. This will have both false positives (e.g. series
appearing and disappearing) and false negatives (e.g. series hit
sample_limit, but still created in head block), but should be generally
useful as-is.

Relevant docs live in another repo.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2019-05-08 22:24:00 +01:00 committed by GitHub
parent c8939a67ba
commit b98e818876
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 46 deletions

View file

@ -932,12 +932,12 @@ mainLoop:
// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
total, added, appErr := sl.append(b, contentType, start)
total, added, seriesAdded, appErr := sl.append(b, contentType, start)
if appErr != nil {
level.Warn(sl.l).Log("msg", "append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, err := sl.append([]byte{}, "", start); err != nil {
if _, _, _, err := sl.append([]byte{}, "", start); err != nil {
level.Warn(sl.l).Log("msg", "append failed", "err", err)
}
}
@ -948,7 +948,7 @@ mainLoop:
scrapeErr = appErr
}
if err := sl.report(start, time.Since(start), total, added, scrapeErr); err != nil {
if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
level.Warn(sl.l).Log("msg", "appending scrape report failed", "err", err)
}
last = start
@ -1008,7 +1008,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// Call sl.append again with an empty scrape to trigger stale markers.
// If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored.
if _, _, err := sl.append([]byte{}, "", staleTime); err != nil {
if _, _, _, err := sl.append([]byte{}, "", staleTime); err != nil {
level.Error(sl.l).Log("msg", "stale append failed", "err", err)
}
if err := sl.reportStale(staleTime); err != nil {
@ -1045,7 +1045,7 @@ func (s samples) Less(i, j int) bool {
return s[i].t < s[j].t
}
func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added int, err error) {
func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
var (
app = sl.appender()
p = textparse.New(b, contentType)
@ -1178,6 +1178,7 @@ loop:
sl.cache.trackStaleness(hash, lset)
}
sl.cache.addRef(mets, ref, lset, hash)
seriesAdded++
}
added++
}
@ -1212,17 +1213,17 @@ loop:
}
if err != nil {
app.Rollback()
return total, added, err
return total, added, seriesAdded, err
}
if err := app.Commit(); err != nil {
return total, added, err
return total, added, seriesAdded, err
}
// Only perform cache cleaning if the scrape was not empty.
// An empty scrape (usually) is used to indicate a failed scrape.
sl.cache.iterDone(len(b) > 0)
return total, added, nil
return total, added, seriesAdded, nil
}
func yoloString(b []byte) string {
@ -1236,9 +1237,10 @@ const (
scrapeDurationMetricName = "scrape_duration_seconds" + "\xff"
scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff"
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff"
scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff"
)
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error {
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, err error) error {
sl.scraper.report(start, duration, err)
ts := timestamp.FromTime(start)
@ -1265,6 +1267,10 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a
app.Rollback()
return err
}
if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil {
app.Rollback()
return err
}
return app.Commit()
}
@ -1290,6 +1296,10 @@ func (sl *scrapeLoop) reportStale(start time.Time) error {
app.Rollback()
return err
}
if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil {
app.Rollback()
return err
}
return app.Commit()
}

View file

@ -490,15 +490,15 @@ func TestScrapeLoopStop(t *testing.T) {
t.Fatalf("Scrape wasn't stopped.")
}
// We expected 1 actual sample for each scrape plus 4 for report samples.
// We expected 1 actual sample for each scrape plus 5 for report samples.
// At least 2 scrapes were made, plus the final stale markers.
if len(appender.result) < 5*3 || len(appender.result)%5 != 0 {
t.Fatalf("Expected at least 3 scrapes with 4 samples each, got %d samples", len(appender.result))
if len(appender.result) < 6*3 || len(appender.result)%6 != 0 {
t.Fatalf("Expected at least 3 scrapes with 6 samples each, got %d samples", len(appender.result))
}
// All samples in a scrape must have the same timestamp.
var ts int64
for i, s := range appender.result {
if i%5 == 0 {
if i%6 == 0 {
ts = s.t
} else if s.t != ts {
t.Fatalf("Unexpected multiple timestamps within single scrape")
@ -632,7 +632,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
)
defer cancel()
total, _, err := sl.append([]byte(`# TYPE test_metric counter
total, _, _, err := sl.append([]byte(`# TYPE test_metric counter
# HELP test_metric some help text
# UNIT test_metric metric
test_metric 1
@ -661,6 +661,41 @@ test_metric 1
testutil.Equals(t, "", md.Unit)
}
func TestScrapeLoopSeriesAdded(t *testing.T) {
// Need a full storage for correct Add/AddFast semantics.
s := testutil.NewStorage(t)
defer s.Close()
app, err := s.Appender()
if err != nil {
t.Error(err)
}
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
&testScraper{},
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
nil,
0,
true,
)
defer cancel()
total, added, seriesAdded, err := sl.append([]byte("test_metric 1\n"), "", time.Time{})
testutil.Ok(t, err)
testutil.Equals(t, 1, total)
testutil.Equals(t, 1, added)
testutil.Equals(t, 1, seriesAdded)
total, added, seriesAdded, err = sl.append([]byte("test_metric 1\n"), "", time.Time{})
testutil.Ok(t, err)
testutil.Equals(t, 1, total)
testutil.Equals(t, 1, added)
testutil.Equals(t, 0, seriesAdded)
}
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
appender := &collectResultAppender{}
var (
@ -707,15 +742,15 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
t.Fatalf("Scrape wasn't stopped.")
}
// 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
// each scrape successful or not.
if len(appender.result) != 22 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result))
if len(appender.result) != 27 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 27, len(appender.result))
}
if appender.result[0].v != 42.0 {
t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0)
}
if !value.IsStaleNaN(appender.result[5].v) {
if !value.IsStaleNaN(appender.result[6].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v))
}
}
@ -769,16 +804,16 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
t.Fatalf("Scrape wasn't stopped.")
}
// 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
// each scrape successful or not.
if len(appender.result) != 14 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 14, len(appender.result))
if len(appender.result) != 17 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 17, len(appender.result))
}
if appender.result[0].v != 42.0 {
t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0)
}
if !value.IsStaleNaN(appender.result[5].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v))
if !value.IsStaleNaN(appender.result[6].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[6].v))
}
}
@ -854,10 +889,10 @@ func TestScrapeLoopCache(t *testing.T) {
t.Fatalf("Scrape wasn't stopped.")
}
// 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
// each scrape successful or not.
if len(appender.result) != 22 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result))
if len(appender.result) != 26 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 26, len(appender.result))
}
}
@ -989,7 +1024,7 @@ func TestScrapeLoopAppend(t *testing.T) {
now := time.Now()
_, _, err := sl.append([]byte(test.scrapeLabels), "", now)
_, _, _, err := sl.append([]byte(test.scrapeLabels), "", now)
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
@ -1037,7 +1072,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
beforeMetricValue := beforeMetric.GetCounter().GetValue()
now := time.Now()
_, _, err = sl.append([]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
_, _, _, err = sl.append([]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
if err != errSampleLimit {
t.Fatalf("Did not see expected sample limit error: %s", err)
}
@ -1091,11 +1126,11 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
)
now := time.Now()
_, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1`), "", now)
_, _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1`), "", now)
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
_, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
_, _, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
@ -1132,11 +1167,11 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
)
now := time.Now()
_, _, err := sl.append([]byte("metric_a 1\n"), "", now)
_, _, _, err := sl.append([]byte("metric_a 1\n"), "", now)
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
_, _, err = sl.append([]byte(""), "", now.Add(time.Second))
_, _, _, err = sl.append([]byte(""), "", now.Add(time.Second))
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
@ -1179,11 +1214,11 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
)
now := time.Now()
_, _, err := sl.append([]byte("metric_a 1 1000\n"), "", now)
_, _, _, err := sl.append([]byte("metric_a 1 1000\n"), "", now)
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
_, _, err = sl.append([]byte(""), "", now.Add(time.Second))
_, _, _, err = sl.append([]byte(""), "", now.Add(time.Second))
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
@ -1299,7 +1334,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
)
now := time.Unix(1, 0)
_, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
total, added, seriesAdded, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
@ -1313,6 +1348,9 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
if !reflect.DeepEqual(want, app.result) {
t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result)
}
testutil.Equals(t, 4, total)
testutil.Equals(t, 1, added)
testutil.Equals(t, 1, seriesAdded)
}
func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
@ -1334,15 +1372,10 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
)
now := time.Now().Add(20 * time.Minute)
total, added, err := sl.append([]byte("normal 1\n"), "", now)
if total != 1 {
t.Error("expected 1 metric")
return
}
if added != 0 {
t.Error("no metric should be added")
}
total, added, seriesAdded, err := sl.append([]byte("normal 1\n"), "", now)
testutil.Equals(t, 1, total)
testutil.Equals(t, 0, added)
testutil.Equals(t, 0, seriesAdded)
if err != nil {
t.Errorf("expect no error, got %s", err.Error())
@ -1532,7 +1565,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
)
now := time.Now()
_, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
_, _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
@ -1569,7 +1602,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
)
now := time.Now()
_, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
_, _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}