mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #7976 from roidelapluie/tolerance
Introduce timestamp tolerance in scrapes
This commit is contained in:
commit
da3ea43242
|
@ -241,6 +241,9 @@ func main() {
|
||||||
a.Flag("rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
|
a.Flag("rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
|
||||||
Default("1m").SetValue(&cfg.resendDelay)
|
Default("1m").SetValue(&cfg.resendDelay)
|
||||||
|
|
||||||
|
a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to 2ms to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release.").
|
||||||
|
Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps)
|
||||||
|
|
||||||
a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications.").
|
a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications.").
|
||||||
Default("10000").IntVar(&cfg.notifier.QueueCapacity)
|
Default("10000").IntVar(&cfg.notifier.QueueCapacity)
|
||||||
|
|
||||||
|
|
|
@ -47,6 +47,14 @@ import (
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Temporary tolerance for scrape appends timestamps alignment, to enable better
|
||||||
|
// compression at the TSDB level.
|
||||||
|
// See https://github.com/prometheus/prometheus/issues/7846
|
||||||
|
const scrapeTimestampTolerance = 2 * time.Millisecond
|
||||||
|
|
||||||
|
// AlignScrapeTimestamps enables the tolerance for scrape appends timestamps described above.
|
||||||
|
var AlignScrapeTimestamps = true
|
||||||
|
|
||||||
var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName)
|
var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -985,6 +993,7 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
||||||
|
|
||||||
var last time.Time
|
var last time.Time
|
||||||
|
|
||||||
|
alignedScrapeTime := time.Now()
|
||||||
ticker := time.NewTicker(interval)
|
ticker := time.NewTicker(interval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
@ -999,7 +1008,23 @@ mainLoop:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
last = sl.scrapeAndReport(interval, timeout, last, errc)
|
// Temporary workaround for a jitter in go timers that causes disk space
|
||||||
|
// increase in TSDB.
|
||||||
|
// See https://github.com/prometheus/prometheus/issues/7846
|
||||||
|
scrapeTime := time.Now()
|
||||||
|
if AlignScrapeTimestamps && interval > 100*scrapeTimestampTolerance {
|
||||||
|
// For some reason, a tick might have been skipped, in which case we
|
||||||
|
// would call alignedScrapeTime.Add(interval) multiple times.
|
||||||
|
for scrapeTime.Sub(alignedScrapeTime) >= interval {
|
||||||
|
alignedScrapeTime = alignedScrapeTime.Add(interval)
|
||||||
|
}
|
||||||
|
// Align the scrape time if we are in the tolerance boundaries.
|
||||||
|
if scrapeTime.Sub(alignedScrapeTime) <= scrapeTimestampTolerance {
|
||||||
|
scrapeTime = alignedScrapeTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
last = sl.scrapeAndReport(interval, timeout, last, scrapeTime, errc)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-sl.parentCtx.Done():
|
case <-sl.parentCtx.Done():
|
||||||
|
@ -1023,7 +1048,7 @@ mainLoop:
|
||||||
// In the happy scenario, a single appender is used.
|
// In the happy scenario, a single appender is used.
|
||||||
// This function uses sl.parentCtx instead of sl.ctx on purpose. A scrape should
|
// This function uses sl.parentCtx instead of sl.ctx on purpose. A scrape should
|
||||||
// only be cancelled on shutdown, not on reloads.
|
// only be cancelled on shutdown, not on reloads.
|
||||||
func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time {
|
func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last, appendTime time.Time, errc chan<- error) time.Time {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
// Only record after the first scrape.
|
// Only record after the first scrape.
|
||||||
|
@ -1053,7 +1078,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
||||||
}()
|
}()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err = sl.report(app, start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
|
if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
|
||||||
level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
|
level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -1061,7 +1086,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
||||||
if forcedErr := sl.getForcedError(); forcedErr != nil {
|
if forcedErr := sl.getForcedError(); forcedErr != nil {
|
||||||
scrapeErr = forcedErr
|
scrapeErr = forcedErr
|
||||||
// Add stale markers.
|
// Add stale markers.
|
||||||
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
|
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
|
||||||
app.Rollback()
|
app.Rollback()
|
||||||
app = sl.appender(sl.parentCtx)
|
app = sl.appender(sl.parentCtx)
|
||||||
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
|
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
|
||||||
|
@ -1095,14 +1120,14 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
||||||
|
|
||||||
// A failed scrape is the same as an empty scrape,
|
// A failed scrape is the same as an empty scrape,
|
||||||
// we still call sl.append to trigger stale markers.
|
// we still call sl.append to trigger stale markers.
|
||||||
total, added, seriesAdded, appErr = sl.append(app, b, contentType, start)
|
total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
|
||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
app.Rollback()
|
app.Rollback()
|
||||||
app = sl.appender(sl.parentCtx)
|
app = sl.appender(sl.parentCtx)
|
||||||
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
|
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
|
||||||
// The append failed, probably due to a parse error or sample limit.
|
// The append failed, probably due to a parse error or sample limit.
|
||||||
// Call sl.append again with an empty scrape to trigger stale markers.
|
// Call sl.append again with an empty scrape to trigger stale markers.
|
||||||
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
|
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
|
||||||
app.Rollback()
|
app.Rollback()
|
||||||
app = sl.appender(sl.parentCtx)
|
app = sl.appender(sl.parentCtx)
|
||||||
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
|
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
|
||||||
|
|
Loading…
Reference in a new issue