diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 7029dfbc59..0d67276ffe 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -111,7 +111,7 @@ type scrapePool struct { loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. - newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender) loop + newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop } func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable) *scrapePool { @@ -187,6 +187,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { func() storage.Appender { return sp.reportAppender(t) }, + log.With("target", t.labels.String()), ) ) wg.Add(1) @@ -256,6 +257,7 @@ func (sp *scrapePool) sync(targets []*Target) { func() storage.Appender { return sp.reportAppender(t) }, + log.With("target", t.labels.String()), ) sp.targets[hash] = t @@ -419,6 +421,7 @@ type lsetCacheEntry struct { type scrapeLoop struct { scraper scraper + l log.Logger appender func() storage.Appender reportAppender func() storage.Appender @@ -434,7 +437,10 @@ type scrapeLoop struct { stopped chan struct{} } -func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender) loop { +func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender, l log.Logger) loop { + if l == nil { + l = log.Base() + } sl := &scrapeLoop{ scraper: sc, appender: app, @@ -443,6 +449,7 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag lsetCache: map[uint64]lsetCacheEntry{}, stopped: make(chan struct{}), ctx: ctx, + l: l, } sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) @@ -501,11 +508,11 @@ mainLoop: // A failed scrape is the same as an empty scrape, // we still call sl.append to trigger stale markers. if total, added, err = sl.append(b, start); err != nil { - log.With("err", err).Error("append failed") + sl.l.With("err", err).Error("append failed") // The append failed, probably due to a parse error. // Call sl.append again with an empty scrape to trigger stale markers. if _, _, err = sl.append([]byte{}, start); err != nil { - log.With("err", err).Error("append failed") + sl.l.With("err", err).Error("append failed") } } @@ -568,10 +575,10 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // If the target has since been recreated and scraped, the // stale markers will be out of order and ignored. if _, _, err := sl.append([]byte{}, staleTime); err != nil { - log.With("err", err).Error("stale append failed") + sl.l.With("err", err).Error("stale append failed") } if err := sl.reportStale(staleTime); err != nil { - log.With("err", err).Error("stale report failed") + sl.l.With("err", err).Error("stale report failed") } } @@ -634,12 +641,12 @@ loop: case errSeriesDropped: continue case storage.ErrOutOfOrderSample: - log.With("timeseries", string(met)).Debug("Out of order sample") + sl.l.With("timeseries", string(met)).Debug("Out of order sample") numOutOfOrder += 1 continue case storage.ErrDuplicateSampleForTimestamp: numDuplicates += 1 - log.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") + sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") continue default: break loop @@ -658,13 +665,13 @@ loop: continue case storage.ErrOutOfOrderSample: err = nil - log.With("timeseries", string(met)).Debug("Out of order sample") + sl.l.With("timeseries", string(met)).Debug("Out of order sample") numOutOfOrder += 1 continue case storage.ErrDuplicateSampleForTimestamp: err = nil numDuplicates += 1 - log.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") + sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") continue default: break loop @@ -685,10 +692,10 @@ loop: err = p.Err() } if numOutOfOrder > 0 { - log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") + sl.l.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") } if numDuplicates > 0 { - log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") + sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") } if err == nil { for metric, lset := range sl.samplesInPreviousScrape { diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 878995f57b..534f700af4 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -28,6 +28,7 @@ import ( "testing" "time" + "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "golang.org/x/net/context" @@ -144,7 +145,7 @@ func TestScrapePoolReload(t *testing.T) { } // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. - newLoop := func(ctx context.Context, s scraper, app, reportApp func() storage.Appender) loop { + newLoop := func(ctx context.Context, s scraper, app, reportApp func() storage.Appender, _ log.Logger) loop { l := &testLoop{} l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { if interval != 3*time.Second { @@ -310,7 +311,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, nil, nil) + sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil) // The scrape pool synchronizes on stopping scrape loops. However, new scrape // loops are started asynchronously. Thus it's possible, that a loop is stopped @@ -368,7 +369,7 @@ func TestScrapeLoopStop(t *testing.T) { ) defer close(signal) - sl := newScrapeLoop(context.Background(), scraper, app, reportApp) + sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -427,7 +428,7 @@ func TestScrapeLoopRun(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp) + sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) // The loop must terminate during the initial offset if the context // is canceled. @@ -465,7 +466,7 @@ func TestScrapeLoopRun(t *testing.T) { } ctx, cancel = context.WithCancel(context.Background()) - sl = newScrapeLoop(ctx, scraper, app, reportApp) + sl = newScrapeLoop(ctx, scraper, app, reportApp, nil) go func() { sl.run(time.Second, 100*time.Millisecond, errc) @@ -510,7 +511,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp) + sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -559,7 +560,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp) + sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -737,6 +738,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) { reportAppender: func() storage.Appender { return nopAppender{} }, refCache: map[string]uint64{}, lsetCache: map[uint64]lsetCacheEntry{}, + l: log.Base(), } now := time.Unix(1, 0)