From b87d3ca9eaa489d8c8dc80323c1e8ee84469a03b Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 10 May 2017 16:59:02 +0100 Subject: [PATCH] Create stale markers when a target is stopped. When a target is no longer returned from SD stop() is called. However it may be recreated before the next scrape interval happens. So we wait to set stalemarkers until the scrape of the new target would have happened and been ingested, which is 2 scrape intervals. If we're shutting down the context will be cancelled, so return immediately rather than holding things up for potentially minutes waiting to safely set stalemarkers no newer than now. If the server starts immediately back up again all is well. If not, we're missing some stale markers. --- retrieval/scrape.go | 81 +++++++++++++++++++++++++++++++++++----- retrieval/scrape_test.go | 47 ++++++++++++++++++++++- 2 files changed, 117 insertions(+), 11 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 3678627ce..6d097d55c 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -155,7 +155,7 @@ func (sp *scrapePool) stop() { // reload the scrape pool with the given scrape configuration. The target state is preserved // but all scrape loops are restarted with the new scrape configuration. -// This method returns after all scrape loops that were stopped have fully terminated. +// This method returns after all scrape loops that were stopped have stopped scraping. func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { start := time.Now() @@ -428,9 +428,10 @@ type scrapeLoop struct { lsetCache map[uint64]lsetCacheEntry // Ref to labelset and string samplesInPreviousScrape map[string]labels.Labels - done chan struct{} - ctx context.Context - cancel func() + ctx context.Context + scrapeCtx context.Context + cancel func() + stopped chan struct{} } func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender) loop { @@ -440,20 +441,20 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag reportAppender: reportApp, refCache: map[string]uint64{}, lsetCache: map[uint64]lsetCacheEntry{}, - done: make(chan struct{}), + stopped: make(chan struct{}), + ctx: ctx, } - sl.ctx, sl.cancel = context.WithCancel(ctx) + sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) return sl } func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { - defer close(sl.done) - select { case <-time.After(sl.scraper.offset(interval)): // Continue after a scraping offset. - case <-sl.ctx.Done(): + case <-sl.scrapeCtx.Done(): + close(sl.stopped) return } @@ -464,11 +465,15 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { buf := bytes.NewBuffer(make([]byte, 0, 16000)) +mainLoop: for { buf.Reset() select { case <-sl.ctx.Done(): + close(sl.stopped) return + case <-sl.scrapeCtx.Done(): + break mainLoop default: } @@ -509,15 +514,65 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { select { case <-sl.ctx.Done(): + close(sl.stopped) return + case <-sl.scrapeCtx.Done(): + break mainLoop case <-ticker.C: } } + + close(sl.stopped) + + // Scraping has stopped. We want to write stale markers but + // the target may be recreated, so we wait just over 2 scrape intervals + // before creating them. + // If the context is cancelled, we presume the server is shutting down + // and will restart where is was. We do not attempt to write stale markers + // in this case. + + if last.IsZero() { + // There never was a scrape, so there will be no stale markers. + return + } + + // Wait for when the next scrape would have been, record its timestamp. + var staleTime time.Time + select { + case <-sl.ctx.Done(): + return + case <-ticker.C: + staleTime = time.Now() + } + + // Wait for when the next scrape would have been, if the target was recreated + // samples should have been ingested by now. + select { + case <-sl.ctx.Done(): + return + case <-ticker.C: + } + + // Wait for an extra 10% of the interval, just to be safe. + select { + case <-sl.ctx.Done(): + return + case <-time.After(interval / 10): + } + + // Call sl.append again with an empty scrape to trigger stale markers. + // If the target has since been recreated and scraped, the + // stale markers will be out of order and ignored. + if _, _, err := sl.append([]byte{}, staleTime); err != nil { + log.With("err", err).Error("stale append failed") + } } +// Stop the scraping. May still write data and stale markers after it has +// returned. Cancel the context to stop all writes. func (sl *scrapeLoop) stop() { sl.cancel() - <-sl.done + <-sl.stopped } type sample struct { @@ -624,6 +679,12 @@ loop: switch err { case nil: case errSeriesDropped: + err = nil + continue + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not log here, as this is expected if a target goes away and comes back + // again with a new scrape loop. + err = nil continue default: break diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 266e8ae7a..2477eaf25 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -308,7 +308,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { } } -func TestScrapeLoopStop(t *testing.T) { +func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{} sl := newScrapeLoop(context.Background(), scraper, nil, nil) @@ -355,6 +355,51 @@ func TestScrapeLoopStop(t *testing.T) { } } +func TestScrapeLoopStop(t *testing.T) { + appender := &collectResultAppender{} + var ( + signal = make(chan struct{}) + + scraper = &testScraper{} + app = func() storage.Appender { return appender } + reportApp = func() storage.Appender { return &nopAppender{} } + numScrapes = 0 + ) + defer close(signal) + + sl := newScrapeLoop(context.Background(), scraper, app, reportApp) + + // Succeed once, several failures, then stop. + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + numScrapes += 1 + if numScrapes == 2 { + go func() { + sl.stop() + }() + } + w.Write([]byte("metric_a 42\n")) + return nil + } + + go func() { + sl.run(10*time.Millisecond, time.Hour, nil) + signal <- struct{}{} + }() + + select { + case <-signal: + case <-time.After(5 * time.Second): + t.Fatalf("Scrape wasn't stopped.") + } + + if len(appender.result) < 2 { + t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 2, len(appender.result)) + } + if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) { + t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)].v)) + } +} + func TestScrapeLoopRun(t *testing.T) { var ( signal = make(chan struct{})