From 84aadfc45b10d3dd3db20fbbe931ea34ee18c434 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Tue, 31 Oct 2023 16:58:42 -0400 Subject: [PATCH] scrape: Added trackTimestampsStaleness configuration option Add the ability to track staleness when an explicit timestamp is set. Useful for cAdvisor. Signed-off-by: Julien Pivotto --- config/config.go | 2 + docs/configuration/configuration.md | 8 ++ scrape/scrape.go | 167 +++++++++++++++------------- scrape/scrape_test.go | 99 ++++++++++++++++- 4 files changed, 196 insertions(+), 80 deletions(-) diff --git a/config/config.go b/config/config.go index 0e1c0b782..4c73f6c49 100644 --- a/config/config.go +++ b/config/config.go @@ -563,6 +563,8 @@ type ScrapeConfig struct { HonorLabels bool `yaml:"honor_labels,omitempty"` // Indicator whether the scraped timestamps should be respected. HonorTimestamps bool `yaml:"honor_timestamps"` + // Indicator whether to track the staleness of the scraped timestamps. + TrackTimestampsStaleness bool `yaml:"track_timestamps_staleness"` // A set of query parameters with which the target is scraped. Params url.Values `yaml:"params,omitempty"` // How frequently to scrape the targets of this scrape config. diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 4138271bc..dc4ea12e7 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -222,6 +222,14 @@ job_name: # by the target will be ignored. [ honor_timestamps: | default = true ] +# track_timestamps_staleness controls whether Prometheus tracks staleness of +# the metrics that have an explicit timestamps present in scraped data. +# +# If track_timestamps_staleness is set to "true", a staleness marker will be +# inserted in the TSDB when a metric is no longer present or the target +# is down. +[ track_timestamps_staleness: | default = false ] + # Configures the protocol scheme used for requests. [ scheme: | default = http ] diff --git a/scrape/scrape.go b/scrape/scrape.go index d67297ca7..10214e549 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -95,18 +95,19 @@ type labelLimits struct { } type scrapeLoopOptions struct { - target *Target - scraper scraper - sampleLimit int - bucketLimit int - labelLimits *labelLimits - honorLabels bool - honorTimestamps bool - interval time.Duration - timeout time.Duration - scrapeClassicHistograms bool - mrc []*relabel.Config - cache *scrapeCache + target *Target + scraper scraper + sampleLimit int + bucketLimit int + labelLimits *labelLimits + honorLabels bool + honorTimestamps bool + trackTimestampsStaleness bool + interval time.Duration + timeout time.Duration + scrapeClassicHistograms bool + mrc []*relabel.Config + cache *scrapeCache } const maxAheadTime = 10 * time.Minute @@ -160,6 +161,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed cache, offsetSeed, opts.honorTimestamps, + opts.trackTimestampsStaleness, opts.sampleLimit, opts.bucketLimit, opts.labelLimits, @@ -270,9 +272,10 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), } - honorLabels = sp.config.HonorLabels - honorTimestamps = sp.config.HonorTimestamps - mrc = sp.config.MetricRelabelConfigs + honorLabels = sp.config.HonorLabels + honorTimestamps = sp.config.HonorTimestamps + trackTimestampsStaleness = sp.config.TrackTimestampsStaleness + mrc = sp.config.MetricRelabelConfigs ) sp.targetMtx.Lock() @@ -298,17 +301,18 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { acceptHeader: acceptHeader(cfg.ScrapeProtocols), } newLoop = sp.newLoop(scrapeLoopOptions{ - target: t, - scraper: s, - sampleLimit: sampleLimit, - bucketLimit: bucketLimit, - labelLimits: labelLimits, - honorLabels: honorLabels, - honorTimestamps: honorTimestamps, - mrc: mrc, - cache: cache, - interval: interval, - timeout: timeout, + target: t, + scraper: s, + sampleLimit: sampleLimit, + bucketLimit: bucketLimit, + labelLimits: labelLimits, + honorLabels: honorLabels, + honorTimestamps: honorTimestamps, + trackTimestampsStaleness: trackTimestampsStaleness, + mrc: mrc, + cache: cache, + interval: interval, + timeout: timeout, }) ) if err != nil { @@ -396,10 +400,11 @@ func (sp *scrapePool) sync(targets []*Target) { labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), } - honorLabels = sp.config.HonorLabels - honorTimestamps = sp.config.HonorTimestamps - mrc = sp.config.MetricRelabelConfigs - scrapeClassicHistograms = sp.config.ScrapeClassicHistograms + honorLabels = sp.config.HonorLabels + honorTimestamps = sp.config.HonorTimestamps + trackTimestampsStaleness = sp.config.TrackTimestampsStaleness + mrc = sp.config.MetricRelabelConfigs + scrapeClassicHistograms = sp.config.ScrapeClassicHistograms ) sp.targetMtx.Lock() @@ -421,17 +426,18 @@ func (sp *scrapePool) sync(targets []*Target) { metrics: sp.metrics, } l := sp.newLoop(scrapeLoopOptions{ - target: t, - scraper: s, - sampleLimit: sampleLimit, - bucketLimit: bucketLimit, - labelLimits: labelLimits, - honorLabels: honorLabels, - honorTimestamps: honorTimestamps, - mrc: mrc, - interval: interval, - timeout: timeout, - scrapeClassicHistograms: scrapeClassicHistograms, + target: t, + scraper: s, + sampleLimit: sampleLimit, + bucketLimit: bucketLimit, + labelLimits: labelLimits, + honorLabels: honorLabels, + honorTimestamps: honorTimestamps, + trackTimestampsStaleness: trackTimestampsStaleness, + mrc: mrc, + interval: interval, + timeout: timeout, + scrapeClassicHistograms: scrapeClassicHistograms, }) if err != nil { l.setForcedError(err) @@ -750,21 +756,22 @@ type cacheEntry struct { } type scrapeLoop struct { - scraper scraper - l log.Logger - cache *scrapeCache - lastScrapeSize int - buffers *pool.Pool - offsetSeed uint64 - honorTimestamps bool - forcedErr error - forcedErrMtx sync.Mutex - sampleLimit int - bucketLimit int - labelLimits *labelLimits - interval time.Duration - timeout time.Duration - scrapeClassicHistograms bool + scraper scraper + l log.Logger + cache *scrapeCache + lastScrapeSize int + buffers *pool.Pool + offsetSeed uint64 + honorTimestamps bool + trackTimestampsStaleness bool + forcedErr error + forcedErrMtx sync.Mutex + sampleLimit int + bucketLimit int + labelLimits *labelLimits + interval time.Duration + timeout time.Duration + scrapeClassicHistograms bool appender func(ctx context.Context) storage.Appender sampleMutator labelsMutator @@ -1046,6 +1053,7 @@ func newScrapeLoop(ctx context.Context, cache *scrapeCache, offsetSeed uint64, honorTimestamps bool, + trackTimestampsStaleness bool, sampleLimit int, bucketLimit int, labelLimits *labelLimits, @@ -1080,27 +1088,28 @@ func newScrapeLoop(ctx context.Context, } sl := &scrapeLoop{ - scraper: sc, - buffers: buffers, - cache: cache, - appender: appender, - sampleMutator: sampleMutator, - reportSampleMutator: reportSampleMutator, - stopped: make(chan struct{}), - offsetSeed: offsetSeed, - l: l, - parentCtx: ctx, - appenderCtx: appenderCtx, - honorTimestamps: honorTimestamps, - sampleLimit: sampleLimit, - bucketLimit: bucketLimit, - labelLimits: labelLimits, - interval: interval, - timeout: timeout, - scrapeClassicHistograms: scrapeClassicHistograms, - reportExtraMetrics: reportExtraMetrics, - appendMetadataToWAL: appendMetadataToWAL, - metrics: metrics, + scraper: sc, + buffers: buffers, + cache: cache, + appender: appender, + sampleMutator: sampleMutator, + reportSampleMutator: reportSampleMutator, + stopped: make(chan struct{}), + offsetSeed: offsetSeed, + l: l, + parentCtx: ctx, + appenderCtx: appenderCtx, + honorTimestamps: honorTimestamps, + trackTimestampsStaleness: trackTimestampsStaleness, + sampleLimit: sampleLimit, + bucketLimit: bucketLimit, + labelLimits: labelLimits, + interval: interval, + timeout: timeout, + scrapeClassicHistograms: scrapeClassicHistograms, + reportExtraMetrics: reportExtraMetrics, + appendMetadataToWAL: appendMetadataToWAL, + metrics: metrics, } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -1547,7 +1556,7 @@ loop: } if !ok { - if parsedTimestamp == nil { + if parsedTimestamp == nil || sl.trackTimestampsStaleness { // Bypass staleness logic if there is an explicit timestamp. sl.cache.trackStaleness(hash, lset) } @@ -1628,7 +1637,7 @@ loop: func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { switch errors.Cause(err) { case nil: - if tp == nil && ce != nil { + if (tp == nil || sl.trackTimestampsStaleness) && ce != nil { sl.cache.trackStaleness(ce.hash, ce.lset) } return true, nil diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 672f46614..7be3f3461 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -650,6 +650,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) { nopMutator, nil, nil, 0, true, + false, 0, 0, nil, 1, @@ -724,6 +725,7 @@ func TestScrapeLoopStop(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 10*time.Millisecond, @@ -802,6 +804,7 @@ func TestScrapeLoopRun(t *testing.T) { nil, 0, true, + false, 0, 0, nil, time.Second, @@ -859,6 +862,7 @@ func TestScrapeLoopRun(t *testing.T) { nil, 0, true, + false, 0, 0, nil, time.Second, @@ -920,6 +924,7 @@ func TestScrapeLoopForcedErr(t *testing.T) { nil, 0, true, + false, 0, 0, nil, time.Second, @@ -980,6 +985,7 @@ func TestScrapeLoopMetadata(t *testing.T) { cache, 0, true, + false, 0, 0, nil, 0, @@ -1039,6 +1045,7 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) { nil, 0, true, + false, 0, 0, nil, 0, @@ -1101,6 +1108,7 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -1181,6 +1189,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 10*time.Millisecond, @@ -1246,6 +1255,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 10*time.Millisecond, @@ -1314,6 +1324,7 @@ func TestScrapeLoopCache(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 10*time.Millisecond, @@ -1399,6 +1410,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 10*time.Millisecond, @@ -1515,6 +1527,7 @@ func TestScrapeLoopAppend(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -1613,7 +1626,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) { }, nil, func(ctx context.Context) storage.Appender { return app }, - nil, 0, true, 0, 0, nil, 0, 0, false, false, false, nil, false, newTestScrapeMetrics(t), + nil, 0, true, false, 0, 0, nil, 0, 0, false, false, false, nil, false, newTestScrapeMetrics(t), ) slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)) @@ -1644,6 +1657,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -1704,6 +1718,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { nil, 0, true, + false, app.limit, 0, nil, 0, @@ -1783,6 +1798,7 @@ func TestScrapeLoop_HistogramBucketLimit(t *testing.T) { nil, 0, true, + false, app.limit, 0, nil, 0, @@ -1883,6 +1899,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -1933,6 +1950,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -1986,6 +2004,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -2313,6 +2332,7 @@ metric: < nil, 0, true, + false, 0, 0, nil, 0, @@ -2402,6 +2422,7 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -2456,6 +2477,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 10*time.Millisecond, @@ -2494,6 +2516,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 10*time.Millisecond, @@ -2545,6 +2568,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T nil, 0, true, + false, 0, 0, nil, 0, @@ -2592,6 +2616,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -2883,6 +2908,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { func(ctx context.Context) storage.Appender { return capp }, nil, 0, true, + false, 0, 0, nil, 0, @@ -2926,6 +2952,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { func(ctx context.Context) storage.Appender { return capp }, nil, 0, false, + false, 0, 0, nil, 0, @@ -2968,6 +2995,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -3028,6 +3056,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -3293,6 +3322,7 @@ func TestScrapeAddFast(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 0, @@ -3381,6 +3411,7 @@ func TestScrapeReportSingleAppender(t *testing.T) { nil, 0, true, + false, 0, 0, nil, 10*time.Millisecond, @@ -3585,6 +3616,7 @@ func TestScrapeLoopLabelLimit(t *testing.T) { nil, 0, true, + false, 0, 0, &test.labelLimits, 0, @@ -3646,3 +3678,68 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) { require.Equal(t, "3s", sp.ActiveTargets()[0].labels.Get(model.ScrapeIntervalLabel)) require.Equal(t, "750ms", sp.ActiveTargets()[0].labels.Get(model.ScrapeTimeoutLabel)) } + +func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *testing.T) { + appender := &collectResultAppender{} + var ( + signal = make(chan struct{}, 1) + scraper = &testScraper{} + app = func(ctx context.Context) storage.Appender { return appender } + ) + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + nil, + 0, + true, + true, + 0, 0, + nil, + 10*time.Millisecond, + time.Hour, + false, + false, + false, + nil, + false, + newTestScrapeMetrics(t), + ) + // Succeed once, several failures, then stop. + numScrapes := 0 + + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + numScrapes++ + + switch numScrapes { + case 1: + w.Write([]byte(fmt.Sprintf("metric_a 42 %d\n", time.Now().UnixNano()/int64(time.Millisecond)))) + return nil + case 5: + cancel() + } + return errors.New("scrape failed") + } + + go func() { + sl.run(nil) + signal <- struct{}{} + }() + + select { + case <-signal: + case <-time.After(5 * time.Second): + t.Fatalf("Scrape wasn't stopped.") + } + + // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for + // each scrape successful or not. + require.Equal(t, 27, len(appender.resultFloats), "Appended samples not as expected:\n%s", appender) + require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected") + require.True(t, value.IsStaleNaN(appender.resultFloats[6].f), + "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f)) +}