diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ce2cc4b5c2..fa6b6fa88d 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -241,6 +241,9 @@ func main() { a.Flag("rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). Default("1m").SetValue(&cfg.resendDelay) + a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to 2ms to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). + Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps) + a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). Default("10000").IntVar(&cfg.notifier.QueueCapacity) diff --git a/scrape/scrape.go b/scrape/scrape.go index eb0294c85a..d52b0ac7f3 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -47,6 +47,14 @@ import ( "github.com/prometheus/prometheus/storage" ) +// Temporary tolerance for scrape appends timestamps alignment, to enable better +// compression at the TSDB level. +// See https://github.com/prometheus/prometheus/issues/7846 +const scrapeTimestampTolerance = 2 * time.Millisecond + +// AlignScrapeTimestamps enables the tolerance for scrape appends timestamps described above. +var AlignScrapeTimestamps = true + var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName) var ( @@ -985,6 +993,7 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { var last time.Time + alignedScrapeTime := time.Now() ticker := time.NewTicker(interval) defer ticker.Stop() @@ -999,7 +1008,23 @@ mainLoop: default: } - last = sl.scrapeAndReport(interval, timeout, last, errc) + // Temporary workaround for a jitter in go timers that causes disk space + // increase in TSDB. + // See https://github.com/prometheus/prometheus/issues/7846 + scrapeTime := time.Now() + if AlignScrapeTimestamps && interval > 100*scrapeTimestampTolerance { + // For some reason, a tick might have been skipped, in which case we + // would call alignedScrapeTime.Add(interval) multiple times. + for scrapeTime.Sub(alignedScrapeTime) >= interval { + alignedScrapeTime = alignedScrapeTime.Add(interval) + } + // Align the scrape time if we are in the tolerance boundaries. + if scrapeTime.Sub(alignedScrapeTime) <= scrapeTimestampTolerance { + scrapeTime = alignedScrapeTime + } + } + + last = sl.scrapeAndReport(interval, timeout, last, scrapeTime, errc) select { case <-sl.parentCtx.Done(): @@ -1023,7 +1048,7 @@ mainLoop: // In the happy scenario, a single appender is used. // This function uses sl.parentCtx instead of sl.ctx on purpose. A scrape should // only be cancelled on shutdown, not on reloads. -func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time { +func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last, appendTime time.Time, errc chan<- error) time.Time { start := time.Now() // Only record after the first scrape. @@ -1053,7 +1078,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time }() defer func() { - if err = sl.report(app, start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil { + if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil { level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err) } }() @@ -1061,7 +1086,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time if forcedErr := sl.getForcedError(); forcedErr != nil { scrapeErr = forcedErr // Add stale markers. - if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil { + if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil { app.Rollback() app = sl.appender(sl.parentCtx) level.Warn(sl.l).Log("msg", "Append failed", "err", err) @@ -1095,14 +1120,14 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time // A failed scrape is the same as an empty scrape, // we still call sl.append to trigger stale markers. - total, added, seriesAdded, appErr = sl.append(app, b, contentType, start) + total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime) if appErr != nil { app.Rollback() app = sl.appender(sl.parentCtx) level.Debug(sl.l).Log("msg", "Append failed", "err", appErr) // The append failed, probably due to a parse error or sample limit. // Call sl.append again with an empty scrape to trigger stale markers. - if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil { + if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil { app.Rollback() app = sl.appender(sl.parentCtx) level.Warn(sl.l).Log("msg", "Append failed", "err", err)