Merge pull request #13060 from roidelapluie/trackTimestampsStaleness

scrape: Added trackTimestampsStaleness configuration option
This commit is contained in:
Julien Pivotto 2023-10-31 19:34:31 -04:00 committed by GitHub
commit 79bbfe601f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 196 additions and 80 deletions

View file

@ -563,6 +563,8 @@ type ScrapeConfig struct {
HonorLabels bool `yaml:"honor_labels,omitempty"` HonorLabels bool `yaml:"honor_labels,omitempty"`
// Indicator whether the scraped timestamps should be respected. // Indicator whether the scraped timestamps should be respected.
HonorTimestamps bool `yaml:"honor_timestamps"` HonorTimestamps bool `yaml:"honor_timestamps"`
// Indicator whether to track the staleness of the scraped timestamps.
TrackTimestampsStaleness bool `yaml:"track_timestamps_staleness"`
// A set of query parameters with which the target is scraped. // A set of query parameters with which the target is scraped.
Params url.Values `yaml:"params,omitempty"` Params url.Values `yaml:"params,omitempty"`
// How frequently to scrape the targets of this scrape config. // How frequently to scrape the targets of this scrape config.

View file

@ -222,6 +222,14 @@ job_name: <job_name>
# by the target will be ignored. # by the target will be ignored.
[ honor_timestamps: <boolean> | default = true ] [ honor_timestamps: <boolean> | default = true ]
# track_timestamps_staleness controls whether Prometheus tracks staleness of
# the metrics that have an explicit timestamps present in scraped data.
#
# If track_timestamps_staleness is set to "true", a staleness marker will be
# inserted in the TSDB when a metric is no longer present or the target
# is down.
[ track_timestamps_staleness: <boolean> | default = false ]
# Configures the protocol scheme used for requests. # Configures the protocol scheme used for requests.
[ scheme: <scheme> | default = http ] [ scheme: <scheme> | default = http ]

View file

@ -95,18 +95,19 @@ type labelLimits struct {
} }
type scrapeLoopOptions struct { type scrapeLoopOptions struct {
target *Target target *Target
scraper scraper scraper scraper
sampleLimit int sampleLimit int
bucketLimit int bucketLimit int
labelLimits *labelLimits labelLimits *labelLimits
honorLabels bool honorLabels bool
honorTimestamps bool honorTimestamps bool
interval time.Duration trackTimestampsStaleness bool
timeout time.Duration interval time.Duration
scrapeClassicHistograms bool timeout time.Duration
mrc []*relabel.Config scrapeClassicHistograms bool
cache *scrapeCache mrc []*relabel.Config
cache *scrapeCache
} }
const maxAheadTime = 10 * time.Minute const maxAheadTime = 10 * time.Minute
@ -160,6 +161,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
cache, cache,
offsetSeed, offsetSeed,
opts.honorTimestamps, opts.honorTimestamps,
opts.trackTimestampsStaleness,
opts.sampleLimit, opts.sampleLimit,
opts.bucketLimit, opts.bucketLimit,
opts.labelLimits, opts.labelLimits,
@ -270,9 +272,10 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
} }
honorLabels = sp.config.HonorLabels honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
mrc = sp.config.MetricRelabelConfigs
) )
sp.targetMtx.Lock() sp.targetMtx.Lock()
@ -298,17 +301,18 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
acceptHeader: acceptHeader(cfg.ScrapeProtocols), acceptHeader: acceptHeader(cfg.ScrapeProtocols),
} }
newLoop = sp.newLoop(scrapeLoopOptions{ newLoop = sp.newLoop(scrapeLoopOptions{
target: t, target: t,
scraper: s, scraper: s,
sampleLimit: sampleLimit, sampleLimit: sampleLimit,
bucketLimit: bucketLimit, bucketLimit: bucketLimit,
labelLimits: labelLimits, labelLimits: labelLimits,
honorLabels: honorLabels, honorLabels: honorLabels,
honorTimestamps: honorTimestamps, honorTimestamps: honorTimestamps,
mrc: mrc, trackTimestampsStaleness: trackTimestampsStaleness,
cache: cache, mrc: mrc,
interval: interval, cache: cache,
timeout: timeout, interval: interval,
timeout: timeout,
}) })
) )
if err != nil { if err != nil {
@ -396,10 +400,11 @@ func (sp *scrapePool) sync(targets []*Target) {
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
} }
honorLabels = sp.config.HonorLabels honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
scrapeClassicHistograms = sp.config.ScrapeClassicHistograms mrc = sp.config.MetricRelabelConfigs
scrapeClassicHistograms = sp.config.ScrapeClassicHistograms
) )
sp.targetMtx.Lock() sp.targetMtx.Lock()
@ -421,17 +426,18 @@ func (sp *scrapePool) sync(targets []*Target) {
metrics: sp.metrics, metrics: sp.metrics,
} }
l := sp.newLoop(scrapeLoopOptions{ l := sp.newLoop(scrapeLoopOptions{
target: t, target: t,
scraper: s, scraper: s,
sampleLimit: sampleLimit, sampleLimit: sampleLimit,
bucketLimit: bucketLimit, bucketLimit: bucketLimit,
labelLimits: labelLimits, labelLimits: labelLimits,
honorLabels: honorLabels, honorLabels: honorLabels,
honorTimestamps: honorTimestamps, honorTimestamps: honorTimestamps,
mrc: mrc, trackTimestampsStaleness: trackTimestampsStaleness,
interval: interval, mrc: mrc,
timeout: timeout, interval: interval,
scrapeClassicHistograms: scrapeClassicHistograms, timeout: timeout,
scrapeClassicHistograms: scrapeClassicHistograms,
}) })
if err != nil { if err != nil {
l.setForcedError(err) l.setForcedError(err)
@ -750,21 +756,22 @@ type cacheEntry struct {
} }
type scrapeLoop struct { type scrapeLoop struct {
scraper scraper scraper scraper
l log.Logger l log.Logger
cache *scrapeCache cache *scrapeCache
lastScrapeSize int lastScrapeSize int
buffers *pool.Pool buffers *pool.Pool
offsetSeed uint64 offsetSeed uint64
honorTimestamps bool honorTimestamps bool
forcedErr error trackTimestampsStaleness bool
forcedErrMtx sync.Mutex forcedErr error
sampleLimit int forcedErrMtx sync.Mutex
bucketLimit int sampleLimit int
labelLimits *labelLimits bucketLimit int
interval time.Duration labelLimits *labelLimits
timeout time.Duration interval time.Duration
scrapeClassicHistograms bool timeout time.Duration
scrapeClassicHistograms bool
appender func(ctx context.Context) storage.Appender appender func(ctx context.Context) storage.Appender
sampleMutator labelsMutator sampleMutator labelsMutator
@ -1046,6 +1053,7 @@ func newScrapeLoop(ctx context.Context,
cache *scrapeCache, cache *scrapeCache,
offsetSeed uint64, offsetSeed uint64,
honorTimestamps bool, honorTimestamps bool,
trackTimestampsStaleness bool,
sampleLimit int, sampleLimit int,
bucketLimit int, bucketLimit int,
labelLimits *labelLimits, labelLimits *labelLimits,
@ -1080,27 +1088,28 @@ func newScrapeLoop(ctx context.Context,
} }
sl := &scrapeLoop{ sl := &scrapeLoop{
scraper: sc, scraper: sc,
buffers: buffers, buffers: buffers,
cache: cache, cache: cache,
appender: appender, appender: appender,
sampleMutator: sampleMutator, sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator, reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}), stopped: make(chan struct{}),
offsetSeed: offsetSeed, offsetSeed: offsetSeed,
l: l, l: l,
parentCtx: ctx, parentCtx: ctx,
appenderCtx: appenderCtx, appenderCtx: appenderCtx,
honorTimestamps: honorTimestamps, honorTimestamps: honorTimestamps,
sampleLimit: sampleLimit, trackTimestampsStaleness: trackTimestampsStaleness,
bucketLimit: bucketLimit, sampleLimit: sampleLimit,
labelLimits: labelLimits, bucketLimit: bucketLimit,
interval: interval, labelLimits: labelLimits,
timeout: timeout, interval: interval,
scrapeClassicHistograms: scrapeClassicHistograms, timeout: timeout,
reportExtraMetrics: reportExtraMetrics, scrapeClassicHistograms: scrapeClassicHistograms,
appendMetadataToWAL: appendMetadataToWAL, reportExtraMetrics: reportExtraMetrics,
metrics: metrics, appendMetadataToWAL: appendMetadataToWAL,
metrics: metrics,
} }
sl.ctx, sl.cancel = context.WithCancel(ctx) sl.ctx, sl.cancel = context.WithCancel(ctx)
@ -1547,7 +1556,7 @@ loop:
} }
if !ok { if !ok {
if parsedTimestamp == nil { if parsedTimestamp == nil || sl.trackTimestampsStaleness {
// Bypass staleness logic if there is an explicit timestamp. // Bypass staleness logic if there is an explicit timestamp.
sl.cache.trackStaleness(hash, lset) sl.cache.trackStaleness(hash, lset)
} }
@ -1628,7 +1637,7 @@ loop:
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
switch errors.Cause(err) { switch errors.Cause(err) {
case nil: case nil:
if tp == nil && ce != nil { if (tp == nil || sl.trackTimestampsStaleness) && ce != nil {
sl.cache.trackStaleness(ce.hash, ce.lset) sl.cache.trackStaleness(ce.hash, ce.lset)
} }
return true, nil return true, nil

View file

@ -650,6 +650,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
nopMutator, nopMutator,
nil, nil, 0, nil, nil, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
1, 1,
@ -724,6 +725,7 @@ func TestScrapeLoopStop(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
10*time.Millisecond, 10*time.Millisecond,
@ -802,6 +804,7 @@ func TestScrapeLoopRun(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
time.Second, time.Second,
@ -859,6 +862,7 @@ func TestScrapeLoopRun(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
time.Second, time.Second,
@ -920,6 +924,7 @@ func TestScrapeLoopForcedErr(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
time.Second, time.Second,
@ -980,6 +985,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
cache, cache,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -1039,6 +1045,7 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -1101,6 +1108,7 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -1181,6 +1189,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
10*time.Millisecond, 10*time.Millisecond,
@ -1246,6 +1255,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
10*time.Millisecond, 10*time.Millisecond,
@ -1314,6 +1324,7 @@ func TestScrapeLoopCache(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
10*time.Millisecond, 10*time.Millisecond,
@ -1399,6 +1410,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
10*time.Millisecond, 10*time.Millisecond,
@ -1515,6 +1527,7 @@ func TestScrapeLoopAppend(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -1613,7 +1626,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
}, },
nil, nil,
func(ctx context.Context) storage.Appender { return app }, func(ctx context.Context) storage.Appender { return app },
nil, 0, true, 0, 0, nil, 0, 0, false, false, false, nil, false, newTestScrapeMetrics(t), nil, 0, true, false, 0, 0, nil, 0, 0, false, false, false, nil, false, newTestScrapeMetrics(t),
) )
slApp := sl.appender(context.Background()) slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)) _, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
@ -1644,6 +1657,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -1704,6 +1718,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
app.limit, 0, app.limit, 0,
nil, nil,
0, 0,
@ -1783,6 +1798,7 @@ func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
app.limit, 0, app.limit, 0,
nil, nil,
0, 0,
@ -1883,6 +1899,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -1933,6 +1950,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -1986,6 +2004,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -2313,6 +2332,7 @@ metric: <
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -2402,6 +2422,7 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -2456,6 +2477,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
10*time.Millisecond, 10*time.Millisecond,
@ -2494,6 +2516,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
10*time.Millisecond, 10*time.Millisecond,
@ -2545,6 +2568,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -2592,6 +2616,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -2883,6 +2908,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
func(ctx context.Context) storage.Appender { return capp }, func(ctx context.Context) storage.Appender { return capp },
nil, 0, nil, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -2926,6 +2952,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
func(ctx context.Context) storage.Appender { return capp }, func(ctx context.Context) storage.Appender { return capp },
nil, 0, nil, 0,
false, false,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -2968,6 +2995,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -3028,6 +3056,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -3293,6 +3322,7 @@ func TestScrapeAddFast(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
0, 0,
@ -3381,6 +3411,7 @@ func TestScrapeReportSingleAppender(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
nil, nil,
10*time.Millisecond, 10*time.Millisecond,
@ -3585,6 +3616,7 @@ func TestScrapeLoopLabelLimit(t *testing.T) {
nil, nil,
0, 0,
true, true,
false,
0, 0, 0, 0,
&test.labelLimits, &test.labelLimits,
0, 0,
@ -3646,3 +3678,68 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
require.Equal(t, "3s", sp.ActiveTargets()[0].labels.Get(model.ScrapeIntervalLabel)) require.Equal(t, "3s", sp.ActiveTargets()[0].labels.Get(model.ScrapeIntervalLabel))
require.Equal(t, "750ms", sp.ActiveTargets()[0].labels.Get(model.ScrapeTimeoutLabel)) require.Equal(t, "750ms", sp.ActiveTargets()[0].labels.Get(model.ScrapeTimeoutLabel))
} }
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *testing.T) {
appender := &collectResultAppender{}
var (
signal = make(chan struct{}, 1)
scraper = &testScraper{}
app = func(ctx context.Context) storage.Appender { return appender }
)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
nil,
0,
true,
true,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
false,
false,
false,
nil,
false,
newTestScrapeMetrics(t),
)
// Succeed once, several failures, then stop.
numScrapes := 0
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
switch numScrapes {
case 1:
w.Write([]byte(fmt.Sprintf("metric_a 42 %d\n", time.Now().UnixNano()/int64(time.Millisecond))))
return nil
case 5:
cancel()
}
return errors.New("scrape failed")
}
go func() {
sl.run(nil)
signal <- struct{}{}
}()
select {
case <-signal:
case <-time.After(5 * time.Second):
t.Fatalf("Scrape wasn't stopped.")
}
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
// each scrape successful or not.
require.Equal(t, 27, len(appender.resultFloats), "Appended samples not as expected:\n%s", appender)
require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected")
require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
}