diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go
index 1d844ddba..d7333b657 100644
--- a/cmd/prometheus/main.go
+++ b/cmd/prometheus/main.go
@@ -234,6 +234,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
level.Info(logger).Log("msg", "Experimental created timestamp zero ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
+ case "delayed-compaction":
+ c.tsdb.EnableDelayedCompaction = true
+ level.Info(logger).Log("msg", "Experimental delayed compaction is enabled.")
case "":
continue
case "promql-at-modifier", "promql-negative-offset":
@@ -475,7 +478,7 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
- a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
+ a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
promlogflag.AddFlags(a, &cfg.promlogConfig)
@@ -1715,6 +1718,7 @@ type tsdbOptions struct {
MaxExemplars int64
EnableMemorySnapshotOnShutdown bool
EnableNativeHistograms bool
+ EnableDelayedCompaction bool
}
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
@@ -1736,6 +1740,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
EnableNativeHistograms: opts.EnableNativeHistograms,
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
EnableOverlappingCompaction: true,
+ EnableDelayedCompaction: opts.EnableDelayedCompaction,
}
}
diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md
index 2faf65105..b8f2e4241 100644
--- a/docs/command-line/prometheus.md
+++ b/docs/command-line/prometheus.md
@@ -56,7 +56,7 @@ The Prometheus monitoring server
| --query.timeout
| Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| --query.max-concurrency
| Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| --query.max-samples
| Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
-| --enable-feature
| Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
+| --enable-feature
| Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| --log.level
| Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| --log.format
| Output format of log messages. One of: [logfmt, json] | `logfmt` |
diff --git a/docs/feature_flags.md b/docs/feature_flags.md
index 24d70647f..3f92ab7fd 100644
--- a/docs/feature_flags.md
+++ b/docs/feature_flags.md
@@ -234,3 +234,17 @@ metadata changes as WAL records on a per-series basis.
This must be used if
you are also using remote write 2.0 as it will only gather metadata from the WAL.
+
+## Delay compaction start time
+
+`--enable-feature=delayed-compaction`
+
+A random offset, up to `10%` of the chunk range, is added to the Head compaction start time. This assists Prometheus instances in avoiding simultaneous compactions and reduces the load on shared resources.
+
+Only auto Head compactions and the operations directly resulting from them are subject to this delay.
+
+In the event of multiple consecutive Head compactions being possible, only the first compaction experiences this delay.
+
+Note that during this delay, the Head continues its usual operations, which include serving and appending series.
+
+Despite the delay in compaction, the blocks produced are time-aligned in the same manner as they would be if the delay was not in place.
diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go
index 0df6ca050..0ea155d10 100644
--- a/tsdb/compact_test.go
+++ b/tsdb/compact_test.go
@@ -22,6 +22,7 @@ import (
"os"
"path"
"path/filepath"
+ "runtime"
"strconv"
"sync"
"testing"
@@ -1925,3 +1926,229 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) {
require.Nil(t, ulids)
require.NoError(t, block.Close())
}
+
+func TestDelayedCompaction(t *testing.T) {
+ // The delay is chosen in such a way as to not slow down the tests, but also to make
+ // the effective compaction duration negligible compared to it, so that the duration comparisons make sense.
+ delay := 1000 * time.Millisecond
+
+ waitUntilCompactedAndCheck := func(db *DB) {
+ t.Helper()
+ start := time.Now()
+ for db.head.compactable() {
+ // This simulates what happens at the end of commits, for less busy DB, a compaction
+ // is triggered every minute. This is to speed up the test.
+ select {
+ case db.compactc <- struct{}{}:
+ default:
+ }
+ time.Sleep(time.Millisecond)
+ }
+ duration := time.Since(start)
+ // Only waited for one offset: offset<=delay<<<2*offset
+ require.Greater(t, duration, db.opts.CompactionDelay)
+ require.Less(t, duration, 2*db.opts.CompactionDelay)
+ }
+
+ compactAndCheck := func(db *DB) {
+ t.Helper()
+ start := time.Now()
+ db.Compact(context.Background())
+ for db.head.compactable() {
+ time.Sleep(time.Millisecond)
+ }
+ if runtime.GOOS == "windows" {
+ // TODO: enable on windows once ms resolution timers are better supported.
+ return
+ }
+ duration := time.Since(start)
+ require.Less(t, duration, delay)
+ }
+
+ cases := []struct {
+ name string
+ // The delays are chosen in such a way as to not slow down the tests, but also in a way to make the
+ // effective compaction duration negligible compared to them, so that the duration comparisons make sense.
+ compactionDelay time.Duration
+ }{
+ {
+ "delayed compaction not enabled",
+ 0,
+ },
+ {
+ "delayed compaction enabled",
+ delay,
+ },
+ }
+
+ for _, c := range cases {
+ c := c
+ t.Run(c.name, func(t *testing.T) {
+ t.Parallel()
+
+ var options *Options
+ if c.compactionDelay > 0 {
+ options = &Options{CompactionDelay: c.compactionDelay}
+ }
+ db := openTestDB(t, options, []int64{10})
+ defer func() {
+ require.NoError(t, db.Close())
+ }()
+
+ label := labels.FromStrings("foo", "bar")
+
+ // The first compaction is expected to result in 1 block.
+ db.DisableCompactions()
+ app := db.Appender(context.Background())
+ _, err := app.Append(0, label, 0, 0)
+ require.NoError(t, err)
+ _, err = app.Append(0, label, 11, 0)
+ require.NoError(t, err)
+ _, err = app.Append(0, label, 21, 0)
+ require.NoError(t, err)
+ require.NoError(t, app.Commit())
+
+ if c.compactionDelay == 0 {
+ // When delay is not enabled, compaction should run on the first trigger.
+ compactAndCheck(db)
+ } else {
+ db.EnableCompactions()
+ waitUntilCompactedAndCheck(db)
+ // The db.compactc signals have been processed multiple times since a compaction is triggered every 1ms by waitUntilCompacted.
+ // This implies that the compaction delay doesn't block or wait on the initial trigger.
+ // 3 is an arbitrary value because it's difficult to determine the precise value.
+ require.GreaterOrEqual(t, prom_testutil.ToFloat64(db.metrics.compactionsTriggered)-prom_testutil.ToFloat64(db.metrics.compactionsSkipped), 3.0)
+ // The delay doesn't change the head blocks alignement.
+ require.Eventually(t, func() bool {
+ return db.head.MinTime() == db.compactor.(*LeveledCompactor).ranges[0]+1
+ }, 500*time.Millisecond, 10*time.Millisecond)
+ // One compaction was run and one block was produced.
+ require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran))
+ }
+
+ // The second compaction is expected to result in 2 blocks.
+ // This ensures that the logic for compaction delay doesn't only work for the first compaction, but also takes into account the future compactions.
+ // This also ensures that no delay happens between consecutive compactions.
+ db.DisableCompactions()
+ app = db.Appender(context.Background())
+ _, err = app.Append(0, label, 31, 0)
+ require.NoError(t, err)
+ _, err = app.Append(0, label, 41, 0)
+ require.NoError(t, err)
+ require.NoError(t, app.Commit())
+
+ if c.compactionDelay == 0 {
+ // Compaction should still run on the first trigger.
+ compactAndCheck(db)
+ } else {
+ db.EnableCompactions()
+ waitUntilCompactedAndCheck(db)
+ }
+
+ // Two other compactions were run.
+ require.Eventually(t, func() bool {
+ return prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) == 3.0
+ }, 500*time.Millisecond, 10*time.Millisecond)
+
+ if c.compactionDelay == 0 {
+ return
+ }
+
+ // This test covers a special case. If auto compaction is in a delay period and a manual compaction is triggered,
+ // auto compaction should stop waiting for the delay if the head is no longer compactable.
+ // Of course, if the head is still compactable after the manual compaction, auto compaction will continue waiting for the same delay.
+ getTimeWhenCompactionDelayStarted := func() time.Time {
+ t.Helper()
+ db.cmtx.Lock()
+ defer db.cmtx.Unlock()
+ return db.timeWhenCompactionDelayStarted
+ }
+
+ db.DisableCompactions()
+ app = db.Appender(context.Background())
+ _, err = app.Append(0, label, 51, 0)
+ require.NoError(t, err)
+ require.NoError(t, app.Commit())
+
+ require.True(t, db.head.compactable())
+ db.EnableCompactions()
+ // Trigger an auto compaction.
+ db.compactc <- struct{}{}
+ // That made auto compaction start waiting for the delay.
+ require.Eventually(t, func() bool {
+ return !getTimeWhenCompactionDelayStarted().IsZero()
+ }, 100*time.Millisecond, 10*time.Millisecond)
+ // Trigger a manual compaction.
+ require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, 50.0)))
+ require.Equal(t, 4.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran))
+ // Re-trigger an auto compaction.
+ db.compactc <- struct{}{}
+ // That made auto compaction stop waiting for the delay.
+ require.Eventually(t, func() bool {
+ return getTimeWhenCompactionDelayStarted().IsZero()
+ }, 100*time.Millisecond, 10*time.Millisecond)
+ })
+ }
+}
+
+// TestDelayedCompactionDoesNotBlockUnrelatedOps makes sure that when delayed compaction is enabled,
+// operations that don't directly derive from the Head compaction are not delayed, here we consider disk blocks compaction.
+func TestDelayedCompactionDoesNotBlockUnrelatedOps(t *testing.T) {
+ cases := []struct {
+ name string
+ whenCompactable bool
+ }{
+ {
+ "Head is compactable",
+ true,
+ },
+ {
+ "Head is not compactable",
+ false,
+ },
+ }
+
+ for _, c := range cases {
+ c := c
+ t.Run(c.name, func(t *testing.T) {
+ t.Parallel()
+
+ tmpdir := t.TempDir()
+ // Some blocks that need compation are present.
+ createBlock(t, tmpdir, genSeries(1, 1, 0, 100))
+ createBlock(t, tmpdir, genSeries(1, 1, 100, 200))
+ createBlock(t, tmpdir, genSeries(1, 1, 200, 300))
+
+ options := DefaultOptions()
+ // This will make the test timeout if compaction really waits for it.
+ options.CompactionDelay = time.Hour
+ db, err := open(tmpdir, log.NewNopLogger(), nil, options, []int64{10, 200}, nil)
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, db.Close())
+ }()
+
+ db.DisableCompactions()
+ require.Len(t, db.Blocks(), 3)
+
+ if c.whenCompactable {
+ label := labels.FromStrings("foo", "bar")
+ app := db.Appender(context.Background())
+ _, err := app.Append(0, label, 301, 0)
+ require.NoError(t, err)
+ _, err = app.Append(0, label, 317, 0)
+ require.NoError(t, err)
+ require.NoError(t, app.Commit())
+ // The Head is compactable and will still be at the end.
+ require.True(t, db.head.compactable())
+ defer func() {
+ require.True(t, db.head.compactable())
+ }()
+ }
+
+ // The blocks were compacted.
+ db.Compact(context.Background())
+ require.Len(t, db.Blocks(), 2)
+ })
+ }
+}
diff --git a/tsdb/db.go b/tsdb/db.go
index 87870a847..3c73c892a 100644
--- a/tsdb/db.go
+++ b/tsdb/db.go
@@ -21,6 +21,7 @@ import (
"io"
"io/fs"
"math"
+ "math/rand"
"os"
"path/filepath"
"slices"
@@ -84,6 +85,8 @@ func DefaultOptions() *Options {
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
EnableOverlappingCompaction: true,
EnableSharding: false,
+ EnableDelayedCompaction: false,
+ CompactionDelay: time.Duration(0),
}
}
@@ -190,6 +193,13 @@ type Options struct {
// EnableSharding enables query sharding support in TSDB.
EnableSharding bool
+ // EnableDelayedCompaction, when set to true, assigns a random value to CompactionDelay during DB opening.
+ // When set to false, delayed compaction is disabled, unless CompactionDelay is set directly.
+ EnableDelayedCompaction bool
+ // CompactionDelay delays the start time of auto compactions.
+ // It can be increased by up to one minute if the DB does not commit too often.
+ CompactionDelay time.Duration
+
// NewCompactorFunc is a function that returns a TSDB compactor.
NewCompactorFunc NewCompactorFunc
@@ -246,6 +256,9 @@ type DB struct {
// Cancel a running compaction when a shutdown is initiated.
compactCancel context.CancelFunc
+ // timeWhenCompactionDelayStarted helps delay the compactions start time.
+ timeWhenCompactionDelayStarted time.Time
+
// oooWasEnabled is true if out of order support was enabled at least one time
// during the time TSDB was up. In which case we need to keep supporting
// out-of-order compaction and vertical queries.
@@ -998,6 +1011,10 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
db.oooWasEnabled.Store(true)
}
+ if opts.EnableDelayedCompaction {
+ opts.CompactionDelay = db.generateCompactionDelay()
+ }
+
go db.run(ctx)
return db, nil
@@ -1186,6 +1203,12 @@ func (a dbAppender) Commit() error {
return err
}
+// waitingForCompactionDelay returns true if the DB is waiting for the Head compaction delay.
+// This doesn't guarantee that the Head is really compactable.
+func (db *DB) waitingForCompactionDelay() bool {
+ return time.Since(db.timeWhenCompactionDelayStarted) < db.opts.CompactionDelay
+}
+
// Compact data if possible. After successful compaction blocks are reloaded
// which will also delete the blocks that fall out of the retention window.
// Old blocks are only deleted on reloadBlocks based on the new block's parent information.
@@ -1219,7 +1242,21 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) {
return nil
default:
}
+
if !db.head.compactable() {
+ // Reset the counter once the head compactions are done.
+ // This would also reset it if a manual compaction was triggered while the auto compaction was in its delay period.
+ if !db.timeWhenCompactionDelayStarted.IsZero() {
+ db.timeWhenCompactionDelayStarted = time.Time{}
+ }
+ break
+ }
+
+ if db.timeWhenCompactionDelayStarted.IsZero() {
+ // Start counting for the delay.
+ db.timeWhenCompactionDelayStarted = time.Now()
+ }
+ if db.waitingForCompactionDelay() {
break
}
mint := db.head.MinTime()
@@ -1429,7 +1466,7 @@ func (db *DB) compactBlocks() (err error) {
// If we have a lot of blocks to compact the whole process might take
// long enough that we end up with a HEAD block that needs to be written.
// Check if that's the case and stop compactions early.
- if db.head.compactable() {
+ if db.head.compactable() && !db.waitingForCompactionDelay() {
level.Warn(db.logger).Log("msg", "aborting block compactions to persit the head block")
return nil
}
@@ -1932,6 +1969,11 @@ func (db *DB) EnableCompactions() {
level.Info(db.logger).Log("msg", "Compactions enabled")
}
+func (db *DB) generateCompactionDelay() time.Duration {
+ // Up to 10% of the head's chunkRange.
+ return time.Duration(rand.Int63n(db.head.chunkRange.Load()/10)) * time.Millisecond
+}
+
// ForceHeadMMap is intended for use only in tests and benchmarks.
func (db *DB) ForceHeadMMap() {
db.head.mmapHeadChunks()
diff --git a/tsdb/db_test.go b/tsdb/db_test.go
index 781f36026..cf41e25f2 100644
--- a/tsdb/db_test.go
+++ b/tsdb/db_test.go
@@ -7357,3 +7357,25 @@ func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) {
// Make sure only block-1 is queried.
require.Equal(t, "block-1", lbls.Get("block"))
}
+
+func TestGenerateCompactionDelay(t *testing.T) {
+ assertDelay := func(delay time.Duration) {
+ t.Helper()
+ require.GreaterOrEqual(t, delay, time.Duration(0))
+ // Less than 10% of the chunkRange.
+ require.LessOrEqual(t, delay, 6000*time.Millisecond)
+ }
+
+ opts := DefaultOptions()
+ opts.EnableDelayedCompaction = true
+ db := openTestDB(t, opts, []int64{60000})
+ defer func() {
+ require.NoError(t, db.Close())
+ }()
+ // The offset is generated and changed while opening.
+ assertDelay(db.opts.CompactionDelay)
+
+ for i := 0; i < 1000; i++ {
+ assertDelay(db.generateCompactionDelay())
+ }
+}