mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-27 06:29:42 -08:00
Merge pull request #13066 from prometheus/superq/pick_timestamp
scrape: Added trackTimestampsStaleness configuration option
This commit is contained in:
commit
287511d92f
|
@ -484,6 +484,8 @@ type ScrapeConfig struct {
|
|||
HonorLabels bool `yaml:"honor_labels,omitempty"`
|
||||
// Indicator whether the scraped timestamps should be respected.
|
||||
HonorTimestamps bool `yaml:"honor_timestamps"`
|
||||
// Indicator whether to track the staleness of the scraped timestamps.
|
||||
TrackTimestampsStaleness bool `yaml:"track_timestamps_staleness"`
|
||||
// A set of query parameters with which the target is scraped.
|
||||
Params url.Values `yaml:"params,omitempty"`
|
||||
// How frequently to scrape the targets of this scrape config.
|
||||
|
|
|
@ -210,6 +210,14 @@ job_name: <job_name>
|
|||
# by the target will be ignored.
|
||||
[ honor_timestamps: <boolean> | default = true ]
|
||||
|
||||
# track_timestamps_staleness controls whether Prometheus tracks staleness of
|
||||
# the metrics that have an explicit timestamps present in scraped data.
|
||||
#
|
||||
# If track_timestamps_staleness is set to "true", a staleness marker will be
|
||||
# inserted in the TSDB when a metric is no longer present or the target
|
||||
# is down.
|
||||
[ track_timestamps_staleness: <boolean> | default = false ]
|
||||
|
||||
# Configures the protocol scheme used for requests.
|
||||
[ scheme: <scheme> | default = http ]
|
||||
|
||||
|
|
|
@ -268,6 +268,7 @@ type scrapeLoopOptions struct {
|
|||
labelLimits *labelLimits
|
||||
honorLabels bool
|
||||
honorTimestamps bool
|
||||
trackTimestampsStaleness bool
|
||||
interval time.Duration
|
||||
timeout time.Duration
|
||||
scrapeClassicHistograms bool
|
||||
|
@ -328,6 +329,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
|
|||
cache,
|
||||
offsetSeed,
|
||||
opts.honorTimestamps,
|
||||
opts.trackTimestampsStaleness,
|
||||
opts.sampleLimit,
|
||||
opts.bucketLimit,
|
||||
opts.labelLimits,
|
||||
|
@ -439,6 +441,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
|||
}
|
||||
honorLabels = sp.config.HonorLabels
|
||||
honorTimestamps = sp.config.HonorTimestamps
|
||||
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
|
||||
mrc = sp.config.MetricRelabelConfigs
|
||||
)
|
||||
|
||||
|
@ -470,6 +473,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
|||
labelLimits: labelLimits,
|
||||
honorLabels: honorLabels,
|
||||
honorTimestamps: honorTimestamps,
|
||||
trackTimestampsStaleness: trackTimestampsStaleness,
|
||||
mrc: mrc,
|
||||
cache: cache,
|
||||
interval: interval,
|
||||
|
@ -563,6 +567,7 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|||
}
|
||||
honorLabels = sp.config.HonorLabels
|
||||
honorTimestamps = sp.config.HonorTimestamps
|
||||
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
|
||||
mrc = sp.config.MetricRelabelConfigs
|
||||
scrapeClassicHistograms = sp.config.ScrapeClassicHistograms
|
||||
)
|
||||
|
@ -590,6 +595,7 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|||
labelLimits: labelLimits,
|
||||
honorLabels: honorLabels,
|
||||
honorTimestamps: honorTimestamps,
|
||||
trackTimestampsStaleness: trackTimestampsStaleness,
|
||||
mrc: mrc,
|
||||
interval: interval,
|
||||
timeout: timeout,
|
||||
|
@ -907,6 +913,7 @@ type scrapeLoop struct {
|
|||
buffers *pool.Pool
|
||||
offsetSeed uint64
|
||||
honorTimestamps bool
|
||||
trackTimestampsStaleness bool
|
||||
forcedErr error
|
||||
forcedErrMtx sync.Mutex
|
||||
sampleLimit int
|
||||
|
@ -1191,6 +1198,7 @@ func newScrapeLoop(ctx context.Context,
|
|||
cache *scrapeCache,
|
||||
offsetSeed uint64,
|
||||
honorTimestamps bool,
|
||||
trackTimestampsStaleness bool,
|
||||
sampleLimit int,
|
||||
bucketLimit int,
|
||||
labelLimits *labelLimits,
|
||||
|
@ -1236,6 +1244,7 @@ func newScrapeLoop(ctx context.Context,
|
|||
parentCtx: ctx,
|
||||
appenderCtx: appenderCtx,
|
||||
honorTimestamps: honorTimestamps,
|
||||
trackTimestampsStaleness: trackTimestampsStaleness,
|
||||
sampleLimit: sampleLimit,
|
||||
bucketLimit: bucketLimit,
|
||||
labelLimits: labelLimits,
|
||||
|
@ -1690,7 +1699,7 @@ loop:
|
|||
}
|
||||
|
||||
if !ok {
|
||||
if parsedTimestamp == nil {
|
||||
if parsedTimestamp == nil || sl.trackTimestampsStaleness {
|
||||
// Bypass staleness logic if there is an explicit timestamp.
|
||||
sl.cache.trackStaleness(hash, lset)
|
||||
}
|
||||
|
@ -1771,7 +1780,7 @@ loop:
|
|||
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
|
||||
switch errors.Cause(err) {
|
||||
case nil:
|
||||
if tp == nil && ce != nil {
|
||||
if (tp == nil || sl.trackTimestampsStaleness) && ce != nil {
|
||||
sl.cache.trackStaleness(ce.hash, ce.lset)
|
||||
}
|
||||
return true, nil
|
||||
|
|
|
@ -634,6 +634,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
|
|||
nopMutator,
|
||||
nil, nil, 0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
1,
|
||||
|
@ -707,6 +708,7 @@ func TestScrapeLoopStop(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
10*time.Millisecond,
|
||||
|
@ -784,6 +786,7 @@ func TestScrapeLoopRun(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
time.Second,
|
||||
|
@ -840,6 +843,7 @@ func TestScrapeLoopRun(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
time.Second,
|
||||
|
@ -900,6 +904,7 @@ func TestScrapeLoopForcedErr(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
time.Second,
|
||||
|
@ -959,6 +964,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
|
|||
cache,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -1017,6 +1023,7 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -1078,6 +1085,7 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -1157,6 +1165,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
10*time.Millisecond,
|
||||
|
@ -1221,6 +1230,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
10*time.Millisecond,
|
||||
|
@ -1288,6 +1298,7 @@ func TestScrapeLoopCache(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
10*time.Millisecond,
|
||||
|
@ -1372,6 +1383,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
10*time.Millisecond,
|
||||
|
@ -1487,6 +1499,7 @@ func TestScrapeLoopAppend(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -1583,7 +1596,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
|
|||
return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil)
|
||||
},
|
||||
nil,
|
||||
func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, 0, nil, 0, 0, false, false, false, nil, false,
|
||||
func(ctx context.Context) storage.Appender { return app }, nil, 0, true, false, 0, 0, nil, 0, 0, false, false, false, nil, false,
|
||||
)
|
||||
slApp := sl.appender(context.Background())
|
||||
_, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
|
||||
|
@ -1614,6 +1627,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -1673,6 +1687,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
app.limit, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -1751,6 +1766,7 @@ func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
app.limit, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -1850,6 +1866,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -1899,6 +1916,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -1951,6 +1969,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -2277,6 +2296,7 @@ metric: <
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -2365,6 +2385,7 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -2418,6 +2439,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
10*time.Millisecond,
|
||||
|
@ -2455,6 +2477,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
10*time.Millisecond,
|
||||
|
@ -2505,6 +2528,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -2551,6 +2575,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -2840,6 +2865,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
|
|||
func(ctx context.Context) storage.Appender { return capp },
|
||||
nil, 0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -2882,6 +2908,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
|
|||
func(ctx context.Context) storage.Appender { return capp },
|
||||
nil, 0,
|
||||
false,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -2923,6 +2950,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -2982,6 +3010,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -3246,6 +3275,7 @@ func TestScrapeAddFast(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
0,
|
||||
|
@ -3333,6 +3363,7 @@ func TestScrapeReportSingleAppender(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
nil,
|
||||
10*time.Millisecond,
|
||||
|
@ -3536,6 +3567,7 @@ func TestScrapeLoopLabelLimit(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
0, 0,
|
||||
&test.labelLimits,
|
||||
0,
|
||||
|
@ -3596,3 +3628,67 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
|
|||
require.Equal(t, "3s", sp.ActiveTargets()[0].labels.Get(model.ScrapeIntervalLabel))
|
||||
require.Equal(t, "750ms", sp.ActiveTargets()[0].labels.Get(model.ScrapeTimeoutLabel))
|
||||
}
|
||||
|
||||
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *testing.T) {
|
||||
appender := &collectResultAppender{}
|
||||
var (
|
||||
signal = make(chan struct{}, 1)
|
||||
scraper = &testScraper{}
|
||||
app = func(ctx context.Context) storage.Appender { return appender }
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sl := newScrapeLoop(ctx,
|
||||
scraper,
|
||||
nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
app,
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
true,
|
||||
0, 0,
|
||||
nil,
|
||||
10*time.Millisecond,
|
||||
time.Hour,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
false,
|
||||
)
|
||||
// Succeed once, several failures, then stop.
|
||||
numScrapes := 0
|
||||
|
||||
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
||||
numScrapes++
|
||||
|
||||
switch numScrapes {
|
||||
case 1:
|
||||
w.Write([]byte(fmt.Sprintf("metric_a 42 %d\n", time.Now().UnixNano()/int64(time.Millisecond))))
|
||||
return nil
|
||||
case 5:
|
||||
cancel()
|
||||
}
|
||||
return errors.New("scrape failed")
|
||||
}
|
||||
|
||||
go func() {
|
||||
sl.run(nil)
|
||||
signal <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-signal:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Scrape wasn't stopped.")
|
||||
}
|
||||
|
||||
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
|
||||
// each scrape successful or not.
|
||||
require.Equal(t, 27, len(appender.resultFloats), "Appended samples not as expected:\n%s", appender)
|
||||
require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected")
|
||||
require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
|
||||
"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue