Plumb through logger with target field to scrape loop.

This commit is contained in:
Brian Brazil 2017-05-16 14:04:37 +01:00
parent d657d722dc
commit bf38963118
2 changed files with 28 additions and 19 deletions

View file

@ -111,7 +111,7 @@ type scrapePool struct {
loops map[uint64]loop loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience. // Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender) loop newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop
} }
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable) *scrapePool { func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable) *scrapePool {
@ -187,6 +187,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
func() storage.Appender { func() storage.Appender {
return sp.reportAppender(t) return sp.reportAppender(t)
}, },
log.With("target", t.labels.String()),
) )
) )
wg.Add(1) wg.Add(1)
@ -256,6 +257,7 @@ func (sp *scrapePool) sync(targets []*Target) {
func() storage.Appender { func() storage.Appender {
return sp.reportAppender(t) return sp.reportAppender(t)
}, },
log.With("target", t.labels.String()),
) )
sp.targets[hash] = t sp.targets[hash] = t
@ -419,6 +421,7 @@ type lsetCacheEntry struct {
type scrapeLoop struct { type scrapeLoop struct {
scraper scraper scraper scraper
l log.Logger
appender func() storage.Appender appender func() storage.Appender
reportAppender func() storage.Appender reportAppender func() storage.Appender
@ -434,7 +437,10 @@ type scrapeLoop struct {
stopped chan struct{} stopped chan struct{}
} }
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender) loop { func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender, l log.Logger) loop {
if l == nil {
l = log.Base()
}
sl := &scrapeLoop{ sl := &scrapeLoop{
scraper: sc, scraper: sc,
appender: app, appender: app,
@ -443,6 +449,7 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag
lsetCache: map[uint64]lsetCacheEntry{}, lsetCache: map[uint64]lsetCacheEntry{},
stopped: make(chan struct{}), stopped: make(chan struct{}),
ctx: ctx, ctx: ctx,
l: l,
} }
sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)
@ -501,11 +508,11 @@ mainLoop:
// A failed scrape is the same as an empty scrape, // A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers. // we still call sl.append to trigger stale markers.
if total, added, err = sl.append(b, start); err != nil { if total, added, err = sl.append(b, start); err != nil {
log.With("err", err).Error("append failed") sl.l.With("err", err).Error("append failed")
// The append failed, probably due to a parse error. // The append failed, probably due to a parse error.
// Call sl.append again with an empty scrape to trigger stale markers. // Call sl.append again with an empty scrape to trigger stale markers.
if _, _, err = sl.append([]byte{}, start); err != nil { if _, _, err = sl.append([]byte{}, start); err != nil {
log.With("err", err).Error("append failed") sl.l.With("err", err).Error("append failed")
} }
} }
@ -568,10 +575,10 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// If the target has since been recreated and scraped, the // If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored. // stale markers will be out of order and ignored.
if _, _, err := sl.append([]byte{}, staleTime); err != nil { if _, _, err := sl.append([]byte{}, staleTime); err != nil {
log.With("err", err).Error("stale append failed") sl.l.With("err", err).Error("stale append failed")
} }
if err := sl.reportStale(staleTime); err != nil { if err := sl.reportStale(staleTime); err != nil {
log.With("err", err).Error("stale report failed") sl.l.With("err", err).Error("stale report failed")
} }
} }
@ -634,12 +641,12 @@ loop:
case errSeriesDropped: case errSeriesDropped:
continue continue
case storage.ErrOutOfOrderSample: case storage.ErrOutOfOrderSample:
log.With("timeseries", string(met)).Debug("Out of order sample") sl.l.With("timeseries", string(met)).Debug("Out of order sample")
numOutOfOrder += 1 numOutOfOrder += 1
continue continue
case storage.ErrDuplicateSampleForTimestamp: case storage.ErrDuplicateSampleForTimestamp:
numDuplicates += 1 numDuplicates += 1
log.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
continue continue
default: default:
break loop break loop
@ -658,13 +665,13 @@ loop:
continue continue
case storage.ErrOutOfOrderSample: case storage.ErrOutOfOrderSample:
err = nil err = nil
log.With("timeseries", string(met)).Debug("Out of order sample") sl.l.With("timeseries", string(met)).Debug("Out of order sample")
numOutOfOrder += 1 numOutOfOrder += 1
continue continue
case storage.ErrDuplicateSampleForTimestamp: case storage.ErrDuplicateSampleForTimestamp:
err = nil err = nil
numDuplicates += 1 numDuplicates += 1
log.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
continue continue
default: default:
break loop break loop
@ -685,10 +692,10 @@ loop:
err = p.Err() err = p.Err()
} }
if numOutOfOrder > 0 { if numOutOfOrder > 0 {
log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") sl.l.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
} }
if numDuplicates > 0 { if numDuplicates > 0 {
log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
} }
if err == nil { if err == nil {
for metric, lset := range sl.samplesInPreviousScrape { for metric, lset := range sl.samplesInPreviousScrape {

View file

@ -28,6 +28,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -144,7 +145,7 @@ func TestScrapePoolReload(t *testing.T) {
} }
// On starting to run, new loops created on reload check whether their preceding // On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped. // equivalents have been stopped.
newLoop := func(ctx context.Context, s scraper, app, reportApp func() storage.Appender) loop { newLoop := func(ctx context.Context, s scraper, app, reportApp func() storage.Appender, _ log.Logger) loop {
l := &testLoop{} l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second { if interval != 3*time.Second {
@ -310,7 +311,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
func TestScrapeLoopStopBeforeRun(t *testing.T) { func TestScrapeLoopStopBeforeRun(t *testing.T) {
scraper := &testScraper{} scraper := &testScraper{}
sl := newScrapeLoop(context.Background(), scraper, nil, nil) sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape // The scrape pool synchronizes on stopping scrape loops. However, new scrape
// loops are started asynchronously. Thus it's possible, that a loop is stopped // loops are started asynchronously. Thus it's possible, that a loop is stopped
@ -368,7 +369,7 @@ func TestScrapeLoopStop(t *testing.T) {
) )
defer close(signal) defer close(signal)
sl := newScrapeLoop(context.Background(), scraper, app, reportApp) sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil)
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -427,7 +428,7 @@ func TestScrapeLoopRun(t *testing.T) {
defer close(signal) defer close(signal)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp) sl := newScrapeLoop(ctx, scraper, app, reportApp, nil)
// The loop must terminate during the initial offset if the context // The loop must terminate during the initial offset if the context
// is canceled. // is canceled.
@ -465,7 +466,7 @@ func TestScrapeLoopRun(t *testing.T) {
} }
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
sl = newScrapeLoop(ctx, scraper, app, reportApp) sl = newScrapeLoop(ctx, scraper, app, reportApp, nil)
go func() { go func() {
sl.run(time.Second, 100*time.Millisecond, errc) sl.run(time.Second, 100*time.Millisecond, errc)
@ -510,7 +511,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
defer close(signal) defer close(signal)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp) sl := newScrapeLoop(ctx, scraper, app, reportApp, nil)
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -559,7 +560,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
defer close(signal) defer close(signal)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp) sl := newScrapeLoop(ctx, scraper, app, reportApp, nil)
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -737,6 +738,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
reportAppender: func() storage.Appender { return nopAppender{} }, reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]uint64{}, refCache: map[string]uint64{},
lsetCache: map[uint64]lsetCacheEntry{}, lsetCache: map[uint64]lsetCacheEntry{},
l: log.Base(),
} }
now := time.Unix(1, 0) now := time.Unix(1, 0)