diff --git a/retrieval/scrape.go b/retrieval/scrape.go index f23cea97a..26e25dae5 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" + "github.com/prometheus/common/model" "github.com/prometheus/common/version" "golang.org/x/net/context" @@ -35,6 +36,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/pool" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" @@ -121,8 +123,8 @@ func init() { // scrapePool manages scrapes for sets of targets. type scrapePool struct { appendable Appendable - - ctx context.Context + logger log.Logger + ctx context.Context mtx sync.RWMutex config *config.ScrapeConfig @@ -133,12 +135,13 @@ type scrapePool struct { loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. - newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop - - logger log.Logger - maxAheadTime time.Duration + newLoop func(*Target, scraper) loop } +const maxAheadTime = 10 * time.Minute + +type labelsMutator func(labels.Labels) labels.Labels + func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool { client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig) if err != nil { @@ -148,26 +151,26 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable buffers := pool.NewBytesPool(163, 100e6, 3) - newLoop := func( - ctx context.Context, - s scraper, - app, reportApp func() storage.Appender, - l log.Logger, - ) loop { - return newScrapeLoop(ctx, s, app, reportApp, buffers, l) + sp := &scrapePool{ + appendable: app, + config: cfg, + ctx: ctx, + client: client, + targets: map[uint64]*Target{}, + loops: map[uint64]loop{}, + logger: logger, + } + sp.newLoop = func(t *Target, s scraper) loop { + return newScrapeLoop(sp.ctx, s, + logger.With("target", t), + buffers, + func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) }, + func(l labels.Labels) labels.Labels { return sp.mutateReportSampleLabels(l, t) }, + sp.appender, + ) } - return &scrapePool{ - appendable: app, - config: cfg, - ctx: ctx, - client: client, - targets: map[uint64]*Target{}, - loops: map[uint64]loop{}, - newLoop: newLoop, - logger: logger, - maxAheadTime: 10 * time.Minute, - } + return sp } // stop terminates all scrape loops and returns after they all terminated. @@ -219,15 +222,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { var ( t = sp.targets[fp] s = &targetScraper{Target: t, client: sp.client, timeout: timeout} - newLoop = sp.newLoop(sp.ctx, s, - func() storage.Appender { - return sp.sampleAppender(t) - }, - func() storage.Appender { - return sp.reportAppender(t) - }, - sp.logger.With("target", t.labels.String()), - ) + newLoop = sp.newLoop(t, s) ) wg.Add(1) @@ -289,15 +284,7 @@ func (sp *scrapePool) sync(targets []*Target) { if _, ok := sp.targets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client, timeout: timeout} - l := sp.newLoop(sp.ctx, s, - func() storage.Appender { - return sp.sampleAppender(t) - }, - func() storage.Appender { - return sp.reportAppender(t) - }, - sp.logger.With("target", t.labels.String()), - ) + l := sp.newLoop(t, s) sp.targets[hash] = t sp.loops[hash] = l @@ -329,18 +316,58 @@ func (sp *scrapePool) sync(targets []*Target) { wg.Wait() } -// sampleAppender returns an appender for ingested samples from the target. -func (sp *scrapePool) sampleAppender(target *Target) storage.Appender { +func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) labels.Labels { + lb := labels.NewBuilder(lset) + + if sp.config.HonorLabels { + for _, l := range target.Labels() { + if lv := lset.Get(l.Name); lv == "" { + lb.Set(l.Name, l.Value) + } + } + } else { + for _, l := range target.Labels() { + lv := lset.Get(l.Name) + if lv != "" { + lb.Set(model.ExportedLabelPrefix+l.Name, lv) + } + lb.Set(l.Name, l.Value) + } + } + + res := lb.Labels() + + if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { + res = relabel.Process(res, mrc...) + } + + return res +} + +func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels { + lb := labels.NewBuilder(lset) + + for _, l := range target.Labels() { + lv := lset.Get(l.Name) + if lv != "" { + lb.Set(model.ExportedLabelPrefix+l.Name, lv) + } + lb.Set(l.Name, l.Value) + } + + return lb.Labels() +} + +// appender returns an appender for ingested samples from the target. +func (sp *scrapePool) appender() storage.Appender { app, err := sp.appendable.Appender() if err != nil { panic(err) } - if sp.maxAheadTime > 0 { - app = &timeLimitAppender{ - Appender: app, - maxTime: timestamp.FromTime(time.Now().Add(sp.maxAheadTime)), - } + app = &timeLimitAppender{ + Appender: app, + maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } // The limit is applied after metrics are potentially dropped via relabeling. @@ -350,42 +377,9 @@ func (sp *scrapePool) sampleAppender(target *Target) storage.Appender { limit: int(sp.config.SampleLimit), } } - - // The relabelAppender has to be inside the label-modifying appenders - // so the relabeling rules are applied to the correct label set. - if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { - app = relabelAppender{ - Appender: app, - relabelings: mrc, - } - } - - if sp.config.HonorLabels { - app = honorLabelsAppender{ - Appender: app, - labels: target.Labels(), - } - } else { - app = ruleLabelsAppender{ - Appender: app, - labels: target.Labels(), - } - } return app } -// reportAppender returns an appender for reporting samples for the target. -func (sp *scrapePool) reportAppender(target *Target) storage.Appender { - app, err := sp.appendable.Appender() - if err != nil { - panic(err) - } - return ruleLabelsAppender{ - Appender: app, - labels: target.Labels(), - } -} - // A scraper retrieves samples and accepts a status report at the end. type scraper interface { scrape(ctx context.Context, w io.Writer) error @@ -479,8 +473,9 @@ type scrapeLoop struct { lastScrapeSize int buffers *pool.BytesPool - appender func() storage.Appender - reportAppender func() storage.Appender + appender func() storage.Appender + sampleMutator labelsMutator + reportSampleMutator labelsMutator ctx context.Context scrapeCtx context.Context @@ -497,6 +492,11 @@ type scrapeCache struct { refs map[string]*refEntry // Parsed string to ref. lsets map[uint64]*lsetCacheEntry // Ref to labelset and string. + // Cache of dropped metric strings and their iteration. The iteration must + // be a pointer so we can update it without setting a new entry with an unsafe + // string in addDropped(). + dropped map[string]*uint64 + // seriesCur and seriesPrev store the labels of series that were seen // in the current and previous scrape. // We hold two maps and swap them out to save allocations. @@ -508,6 +508,7 @@ func newScrapeCache() *scrapeCache { return &scrapeCache{ refs: map[string]*refEntry{}, lsets: map[uint64]*lsetCacheEntry{}, + dropped: map[string]*uint64{}, seriesCur: map[uint64]labels.Labels{}, seriesPrev: map[uint64]labels.Labels{}, } @@ -523,6 +524,11 @@ func (c *scrapeCache) iterDone() { delete(c.lsets, e.ref) } } + for s, iter := range c.dropped { + if *iter < c.iter { + delete(c.dropped, s) + } + } // Swap current and previous series. c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev @@ -560,6 +566,19 @@ func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash ui c.lsets[ref] = &lsetCacheEntry{metric: met, lset: lset, hash: hash} } +func (c *scrapeCache) addDropped(met string) { + iter := c.iter + c.dropped[met] = &iter +} + +func (c *scrapeCache) getDropped(met string) bool { + iterp, ok := c.dropped[met] + if ok { + *iterp = c.iter + } + return ok +} + func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) { c.seriesCur[hash] = lset } @@ -577,26 +596,28 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { func newScrapeLoop( ctx context.Context, sc scraper, - app, reportApp func() storage.Appender, - buffers *pool.BytesPool, l log.Logger, + buffers *pool.BytesPool, + sampleMutator labelsMutator, + reportSampleMutator labelsMutator, + appender func() storage.Appender, ) *scrapeLoop { if l == nil { l = log.Base() } if buffers == nil { - buffers = pool.NewBytesPool(10e3, 100e6, 3) + buffers = pool.NewBytesPool(1e3, 1e6, 3) } sl := &scrapeLoop{ - scraper: sc, - appender: app, - cache: newScrapeCache(), - reportAppender: reportApp, - buffers: buffers, - lastScrapeSize: 16000, - stopped: make(chan struct{}), - ctx: ctx, - l: l, + scraper: sc, + buffers: buffers, + cache: newScrapeCache(), + appender: appender, + sampleMutator: sampleMutator, + reportSampleMutator: reportSampleMutator, + stopped: make(chan struct{}), + ctx: ctx, + l: l, } sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) @@ -796,6 +817,9 @@ loop: t = *tp } + if sl.cache.getDropped(yoloString(met)) { + continue + } ref, ok := sl.cache.getRef(yoloString(met)) if ok { lset := sl.cache.lsets[ref].lset @@ -807,9 +831,6 @@ loop: } case storage.ErrNotFound: ok = false - case errSeriesDropped: - err = nil - continue case storage.ErrOutOfOrderSample: numOutOfOrder++ sl.l.With("timeseries", string(met)).Debug("Out of order sample") @@ -848,6 +869,16 @@ loop: } else { mets = p.Metric(&lset) hash = lset.Hash() + + // Hash label set as it is seen local to the target. Then add target labels + // and relabeling and store the final label set. + lset = sl.sampleMutator(lset) + + // The label set may be set to nil to indicate dropping. + if lset == nil { + sl.cache.addDropped(mets) + continue + } } var ref uint64 @@ -855,9 +886,6 @@ loop: // TODO(fabxc): also add a dropped-cache? switch err { case nil: - case errSeriesDropped: - err = nil - continue case storage.ErrOutOfOrderSample: err = nil numOutOfOrder++ @@ -912,8 +940,6 @@ loop: // Series no longer exposed, mark it stale. _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) switch err { - case errSeriesDropped: - err = nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if a target // goes away and comes back again with a new scrape loop. @@ -948,8 +974,7 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a if err == nil { health = 1 } - - app := sl.reportAppender() + app := sl.appender() if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { app.Rollback() @@ -972,7 +997,8 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a func (sl *scrapeLoop) reportStale(start time.Time) error { ts := timestamp.FromTime(start) - app := sl.reportAppender() + app := sl.appender() + stale := math.Float64frombits(value.StaleNaN) if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { @@ -1019,10 +1045,14 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v lset := labels.Labels{ labels.Label{Name: labels.MetricName, Value: s}, } + + hash := lset.Hash() + lset = sl.reportSampleMutator(lset) + ref, err := app.Add(lset, t, v) switch err { case nil: - sl.cache.addRef(s2, ref, lset, lset.Hash()) + sl.cache.addRef(s2, ref, lset, hash) return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: return nil diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 376c23cdd..b9c11ffff 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -145,7 +145,7 @@ func TestScrapePoolReload(t *testing.T) { } // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. - newLoop := func(ctx context.Context, s scraper, app, reportApp func() storage.Appender, _ log.Logger) loop { + newLoop := func(_ *Target, s scraper) loop { l := &testLoop{} l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { if interval != 3*time.Second { @@ -228,92 +228,48 @@ func TestScrapePoolReload(t *testing.T) { } } -func TestScrapePoolReportAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) +func TestScrapePoolAppender(t *testing.T) { + cfg := &config.ScrapeConfig{} app := &nopAppendable{} - sp := newScrapePool(context.Background(), cfg, app, log.Base()) - cfg.HonorLabels = false - wrapped := sp.reportAppender(target) + wrapped := sp.appender() - rl, ok := wrapped.(ruleLabelsAppender) + tl, ok := wrapped.(*timeLimitAppender) if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + t.Fatalf("Expected timeLimitAppender but got %T", wrapped) } - if _, ok := rl.Appender.(nopAppender); !ok { - t.Fatalf("Expected base appender but got %T", rl.Appender) + if _, ok := tl.Appender.(nopAppender); !ok { + t.Fatalf("Expected base appender but got %T", tl.Appender) } - cfg.HonorLabels = true - wrapped = sp.reportAppender(target) - - hl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if _, ok := rl.Appender.(nopAppender); !ok { - t.Fatalf("Expected base appender but got %T", hl.Appender) - } -} - -func TestScrapePoolSampleAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - app := &nopAppendable{} - - sp := newScrapePool(context.Background(), cfg, app, log.Base()) - sp.maxAheadTime = 0 - - cfg.HonorLabels = false - wrapped := sp.sampleAppender(target) - - rl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - re, ok := rl.Appender.(relabelAppender) - if !ok { - t.Fatalf("Expected relabelAppender but got %T", rl.Appender) - } - if _, ok := re.Appender.(nopAppender); !ok { - t.Fatalf("Expected base appender but got %T", re.Appender) - } - - cfg.HonorLabels = true cfg.SampleLimit = 100 - wrapped = sp.sampleAppender(target) - hl, ok := wrapped.(honorLabelsAppender) + wrapped = sp.appender() + + sl, ok := wrapped.(*limitAppender) if !ok { - t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) + t.Fatalf("Expected limitAppender but got %T", wrapped) } - re, ok = hl.Appender.(relabelAppender) + tl, ok = sl.Appender.(*timeLimitAppender) if !ok { - t.Fatalf("Expected relabelAppender but got %T", hl.Appender) + t.Fatalf("Expected limitAppender but got %T", sl.Appender) } - lm, ok := re.Appender.(*limitAppender) - if !ok { - t.Fatalf("Expected limitAppender but got %T", lm.Appender) - } - if _, ok := lm.Appender.(nopAppender); !ok { - t.Fatalf("Expected base appender but got %T", re.Appender) + if _, ok := tl.Appender.(nopAppender); !ok { + t.Fatalf("Expected base appender but got %T", tl.Appender) } } func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil, nil) + + sl := newScrapeLoop(context.Background(), + scraper, + nil, nil, + nopMutator, + nopMutator, + nil, + ) // The scrape pool synchronizes on stopping scrape loops. However, new scrape // loops are started asynchronously. Thus it's possible, that a loop is stopped @@ -358,22 +314,28 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) { } } -func TestScrapeLoopStop(t *testing.T) { - appender := &collectResultAppender{} - reportAppender := &collectResultAppender{} - var ( - signal = make(chan struct{}) +func nopMutator(l labels.Labels) labels.Labels { return l } - scraper = &testScraper{} - app = func() storage.Appender { return appender } - reportApp = func() storage.Appender { return reportAppender } - numScrapes = 0 +func TestScrapeLoopStop(t *testing.T) { + var ( + signal = make(chan struct{}) + appender = &collectResultAppender{} + scraper = &testScraper{} + app = func() storage.Appender { return appender } ) defer close(signal) - sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil, nil) + sl := newScrapeLoop(context.Background(), + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) + + // Terminate loop after 2 scrapes. + numScrapes := 0 - // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { numScrapes++ if numScrapes == 2 { @@ -394,25 +356,25 @@ func TestScrapeLoopStop(t *testing.T) { t.Fatalf("Scrape wasn't stopped.") } - if len(appender.result) < 2 { - t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 2, len(appender.result)) + // We expected 1 actual sample for each scrape plus 4 for report samples. + // At least 2 scrapes were made, plus the final stale markers. + if len(appender.result) < 5*3 || len(appender.result)%5 != 0 { + t.Fatalf("Expected at least 3 scrapes with 4 samples each, got %d samples", len(appender.result)) } - if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) { - t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)-1].v)) + // All samples in a scrape must have the same timestmap. + var ts int64 + for i, s := range appender.result { + if i%5 == 0 { + ts = s.t + } else if s.t != ts { + t.Fatalf("Unexpected multiple timestamps within single scrape") + } } - - if len(reportAppender.result) < 8 { - t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 8, len(reportAppender.result)) - } - if len(reportAppender.result)%4 != 0 { - t.Fatalf("Appended samples not as expected. Wanted: samples mod 4 == 0 Got: %d samples", len(reportAppender.result)) - } - if !value.IsStaleNaN(reportAppender.result[len(reportAppender.result)-1].v) { - t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(reportAppender.result[len(reportAppender.result)].v)) - } - - if reportAppender.result[len(reportAppender.result)-1].t != appender.result[len(appender.result)-1].t { - t.Fatalf("Expected last append and report sample to have same timestamp. Append: stale NaN Report: %x", appender.result[len(appender.result)-1].t, reportAppender.result[len(reportAppender.result)-1].t) + // All samples from the last scrape must be stale markers. + for _, s := range appender.result[len(appender.result)-5:] { + if !value.IsStaleNaN(s.v) { + t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.v)) + } } } @@ -421,14 +383,19 @@ func TestScrapeLoopRun(t *testing.T) { signal = make(chan struct{}) errc = make(chan error) - scraper = &testScraper{} - app = func() storage.Appender { return &nopAppender{} } - reportApp = func() storage.Appender { return &nopAppender{} } + scraper = &testScraper{} + app = func() storage.Appender { return &nopAppender{} } ) defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) // The loop must terminate during the initial offset if the context // is canceled. @@ -466,7 +433,13 @@ func TestScrapeLoopRun(t *testing.T) { } ctx, cancel = context.WithCancel(context.Background()) - sl = newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) + sl = newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) go func() { sl.run(time.Second, 100*time.Millisecond, errc) @@ -501,19 +474,23 @@ func TestScrapeLoopRun(t *testing.T) { func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { appender := &collectResultAppender{} var ( - signal = make(chan struct{}) - - scraper = &testScraper{} - app = func() storage.Appender { return appender } - reportApp = func() storage.Appender { return &nopAppender{} } - numScrapes = 0 + signal = make(chan struct{}) + scraper = &testScraper{} + app = func() storage.Appender { return appender } ) defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) - + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) // Succeed once, several failures, then stop. + numScrapes := 0 + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { numScrapes++ @@ -537,31 +514,37 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { t.Fatalf("Scrape wasn't stopped.") } - if len(appender.result) != 2 { - t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 2, len(appender.result)) + // 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for + // each scrape successful or not. + if len(appender.result) != 22 { + t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result)) } if appender.result[0].v != 42.0 { t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0], 42) } - if !value.IsStaleNaN(appender.result[1].v) { - t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[1].v)) + if !value.IsStaleNaN(appender.result[5].v) { + t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v)) } } func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { appender := &collectResultAppender{} var ( - signal = make(chan struct{}) - + signal = make(chan struct{}) scraper = &testScraper{} app = func() storage.Appender { return appender } - reportApp = func() storage.Appender { return &nopAppender{} } numScrapes = 0 ) defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -590,26 +573,29 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { t.Fatalf("Scrape wasn't stopped.") } - if len(appender.result) != 2 { - t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 2, len(appender.result)) + // 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for + // each scrape successful or not. + if len(appender.result) != 14 { + t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result)) } if appender.result[0].v != 42.0 { t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0], 42) } - if !value.IsStaleNaN(appender.result[1].v) { - t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[1].v)) + if !value.IsStaleNaN(appender.result[5].v) { + t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v)) } } func TestScrapeLoopAppend(t *testing.T) { app := &collectResultAppender{} - sl := newScrapeLoop(context.Background(), nil, + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + nopMutator, + nopMutator, func() storage.Appender { return app }, - func() storage.Appender { return nopAppender{} }, - nil, - nil, ) + now := time.Now() _, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now) if err != nil { @@ -642,11 +628,12 @@ func TestScrapeLoopAppend(t *testing.T) { func TestScrapeLoopAppendStaleness(t *testing.T) { app := &collectResultAppender{} - sl := newScrapeLoop(context.Background(), nil, + + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + nopMutator, + nopMutator, func() storage.Appender { return app }, - func() storage.Appender { return nopAppender{} }, - nil, - nil, ) now := time.Now() @@ -686,11 +673,11 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { app := &collectResultAppender{} - sl := newScrapeLoop(context.Background(), nil, + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + nopMutator, + nopMutator, func() storage.Appender { return app }, - func() storage.Appender { return nopAppender{} }, - nil, - nil, ) now := time.Now() @@ -713,128 +700,23 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { if !reflect.DeepEqual(want, app.result) { t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result) } - -} - -func TestScrapeLoopRunAppliesScrapeLimit(t *testing.T) { - - cases := []struct { - appender func() storage.Appender - up float64 - scrapeSamplesScraped float64 - scrapeSamplesScrapedPostMetricRelabelling float64 - }{ - { - appender: func() storage.Appender { return nopAppender{} }, - up: 1, - scrapeSamplesScraped: 3, - scrapeSamplesScrapedPostMetricRelabelling: 3, - }, - { - appender: func() storage.Appender { - return &limitAppender{Appender: nopAppender{}, limit: 3} - }, - up: 1, - scrapeSamplesScraped: 3, - scrapeSamplesScrapedPostMetricRelabelling: 3, - }, - { - appender: func() storage.Appender { - return &limitAppender{Appender: nopAppender{}, limit: 2} - }, - up: 0, - scrapeSamplesScraped: 3, - scrapeSamplesScrapedPostMetricRelabelling: 3, - }, - { - appender: func() storage.Appender { - return &relabelAppender{ - Appender: &limitAppender{Appender: nopAppender{}, limit: 2}, - relabelings: []*config.RelabelConfig{ - &config.RelabelConfig{ - SourceLabels: model.LabelNames{"__name__"}, - Regex: config.MustNewRegexp("a"), - Action: config.RelabelDrop, - }, - }, - } - }, - up: 1, - scrapeSamplesScraped: 3, - scrapeSamplesScrapedPostMetricRelabelling: 2, - }, - } - - for i, c := range cases { - reportAppender := &collectResultAppender{} - var ( - signal = make(chan struct{}) - scraper = &testScraper{} - numScrapes = 0 - reportApp = func() storage.Appender { - // Get result of the 2nd scrape. - if numScrapes == 2 { - return reportAppender - } else { - return nopAppender{} - } - } - ) - defer close(signal) - - ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil, nil) - - // Setup a series to be stale, then 3 samples, then stop. - scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { - numScrapes += 1 - if numScrapes == 1 { - w.Write([]byte("stale 0\n")) - return nil - } else if numScrapes == 2 { - w.Write([]byte("a 0\nb 0\nc 0 \n")) - return nil - } else if numScrapes == 3 { - cancel() - } - return fmt.Errorf("Scrape failed.") - } - - go func() { - sl.run(10*time.Millisecond, time.Hour, nil) - signal <- struct{}{} - }() - - select { - case <-signal: - case <-time.After(5 * time.Second): - t.Fatalf("Scrape wasn't stopped.") - } - - if len(reportAppender.result) != 4 { - t.Fatalf("Case %d appended report samples not as expected. Wanted: %d samples Got: %d", i, 4, len(reportAppender.result)) - } - if reportAppender.result[0].v != c.up { - t.Fatalf("Case %d appended up sample not as expected. Wanted: %f Got: %+v", i, c.up, reportAppender.result[0]) - } - if reportAppender.result[2].v != c.scrapeSamplesScraped { - t.Fatalf("Case %d appended scrape_samples_scraped sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScraped, reportAppender.result[2]) - } - if reportAppender.result[3].v != c.scrapeSamplesScrapedPostMetricRelabelling { - t.Fatalf("Case %d appended scrape_samples_scraped_post_metric_relabeling sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScrapedPostMetricRelabelling, reportAppender.result[3]) - } - } } func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { var ( - scraper = &testScraper{} - reportAppender = &collectResultAppender{} - reportApp = func() storage.Appender { return reportAppender } + scraper = &testScraper{} + appender = &collectResultAppender{} + app = func() storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil, nil) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { cancel() @@ -843,31 +725,37 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { sl.run(10*time.Millisecond, time.Hour, nil) - if reportAppender.result[0].v != 0 { - t.Fatalf("bad 'up' value; want 0, got %v", reportAppender.result[0].v) + if appender.result[0].v != 0 { + t.Fatalf("bad 'up' value; want 0, got %v", appender.result[0].v) } } func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { var ( - scraper = &testScraper{} - reportAppender = &collectResultAppender{} - reportApp = func() storage.Appender { return reportAppender } + scraper = &testScraper{} + appender = &collectResultAppender{} + app = func() storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil, nil) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { cancel() - w.Write([]byte("a{l=\"\xff\"} 0\n")) + w.Write([]byte("a{l=\"\xff\"} 1\n")) return nil } sl.run(10*time.Millisecond, time.Hour, nil) - if reportAppender.result[0].v != 0 { - t.Fatalf("bad 'up' value; want 0, got %v", reportAppender.result[0].v) + if appender.result[0].v != 0 { + t.Fatalf("bad 'up' value; want 0, got %v", appender.result[0].v) } } @@ -894,11 +782,13 @@ func (app *errorAppender) AddFast(lset labels.Labels, ref uint64, t int64, v flo func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) { app := &errorAppender{} - sl := newScrapeLoop(context.Background(), nil, + + sl := newScrapeLoop(context.Background(), + nil, + nil, nil, + nopMutator, + nopMutator, func() storage.Appender { return app }, - func() storage.Appender { return nopAppender{} }, - nil, - nil, ) now := time.Unix(1, 0) @@ -920,16 +810,17 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { app := &collectResultAppender{} - sl := newScrapeLoop(context.Background(), nil, + sl := newScrapeLoop(context.Background(), + nil, + nil, nil, + nopMutator, + nopMutator, func() storage.Appender { return &timeLimitAppender{ Appender: app, maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)), } }, - func() storage.Appender { return nopAppender{} }, - nil, - nil, ) now := time.Now().Add(20 * time.Minute) diff --git a/retrieval/target.go b/retrieval/target.go index b305b5abc..862911eba 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -253,63 +253,6 @@ func (app *timeLimitAppender) AddFast(lset labels.Labels, ref uint64, t int64, v return nil } -// Merges the ingested sample's metric with the label set. On a collision the -// value of the ingested label is stored in a label prefixed with 'exported_'. -type ruleLabelsAppender struct { - storage.Appender - labels labels.Labels -} - -func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - lb := labels.NewBuilder(lset) - - for _, l := range app.labels { - lv := lset.Get(l.Name) - if lv != "" { - lb.Set(model.ExportedLabelPrefix+l.Name, lv) - } - lb.Set(l.Name, l.Value) - } - - return app.Appender.Add(lb.Labels(), t, v) -} - -type honorLabelsAppender struct { - storage.Appender - labels labels.Labels -} - -// Merges the sample's metric with the given labels if the label is not -// already present in the metric. -// This also considers labels explicitly set to the empty string. -func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - lb := labels.NewBuilder(lset) - - for _, l := range app.labels { - if lv := lset.Get(l.Name); lv == "" { - lb.Set(l.Name, l.Value) - } - } - return app.Appender.Add(lb.Labels(), t, v) -} - -// Applies a set of relabel configurations to the sample's metric -// before actually appending it. -type relabelAppender struct { - storage.Appender - relabelings []*config.RelabelConfig -} - -var errSeriesDropped = errors.New("series dropped") - -func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - lset = relabel.Process(lset, app.relabelings...) - if lset == nil { - return 0, errSeriesDropped - } - return app.Appender.Add(lset, t, v) -} - // populateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns a nil label set if the target is dropped during relabeling.