From 92873d3009bdcf3c961df76167250f8c545351e2 Mon Sep 17 00:00:00 2001 From: machine424 Date: Mon, 8 Apr 2024 14:59:30 +0200 Subject: [PATCH] feat: allow to delay head compaction start time helping Prometheus instances to avoid simultaneous compactions and reduce stress on shared resources. This is enabled via `--enable-feature=delayed-compaction`. Signed-off-by: machine424 --- cmd/prometheus/main.go | 7 +- docs/command-line/prometheus.md | 2 +- docs/feature_flags.md | 14 ++ tsdb/compact_test.go | 227 ++++++++++++++++++++++++++++++++ tsdb/db.go | 44 ++++++- tsdb/db_test.go | 22 ++++ 6 files changed, 313 insertions(+), 3 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 1d844ddba6..d7333b6576 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -234,6 +234,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols level.Info(logger).Log("msg", "Experimental created timestamp zero ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) + case "delayed-compaction": + c.tsdb.EnableDelayedCompaction = true + level.Info(logger).Log("msg", "Experimental delayed compaction is enabled.") case "": continue case "promql-at-modifier", "promql-negative-offset": @@ -475,7 +478,7 @@ func main() { a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates."). Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) promlogflag.AddFlags(a, &cfg.promlogConfig) @@ -1715,6 +1718,7 @@ type tsdbOptions struct { MaxExemplars int64 EnableMemorySnapshotOnShutdown bool EnableNativeHistograms bool + EnableDelayedCompaction bool } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -1736,6 +1740,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { EnableNativeHistograms: opts.EnableNativeHistograms, OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow, EnableOverlappingCompaction: true, + EnableDelayedCompaction: opts.EnableDelayedCompaction, } } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 2faf65105e..b8f2e4241f 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -56,7 +56,7 @@ The Prometheus monitoring server | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | --query.max-samples | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | -| --enable-feature | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | +| --enable-feature | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | --log.level | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | --log.format | Output format of log messages. One of: [logfmt, json] | `logfmt` | diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 24d70647fd..3f92ab7fd6 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -234,3 +234,17 @@ metadata changes as WAL records on a per-series basis. This must be used if you are also using remote write 2.0 as it will only gather metadata from the WAL. + +## Delay compaction start time + +`--enable-feature=delayed-compaction` + +A random offset, up to `10%` of the chunk range, is added to the Head compaction start time. This assists Prometheus instances in avoiding simultaneous compactions and reduces the load on shared resources. + +Only auto Head compactions and the operations directly resulting from them are subject to this delay. + +In the event of multiple consecutive Head compactions being possible, only the first compaction experiences this delay. + +Note that during this delay, the Head continues its usual operations, which include serving and appending series. + +Despite the delay in compaction, the blocks produced are time-aligned in the same manner as they would be if the delay was not in place. diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 0df6ca0505..0ea155d107 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -22,6 +22,7 @@ import ( "os" "path" "path/filepath" + "runtime" "strconv" "sync" "testing" @@ -1925,3 +1926,229 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) { require.Nil(t, ulids) require.NoError(t, block.Close()) } + +func TestDelayedCompaction(t *testing.T) { + // The delay is chosen in such a way as to not slow down the tests, but also to make + // the effective compaction duration negligible compared to it, so that the duration comparisons make sense. + delay := 1000 * time.Millisecond + + waitUntilCompactedAndCheck := func(db *DB) { + t.Helper() + start := time.Now() + for db.head.compactable() { + // This simulates what happens at the end of commits, for less busy DB, a compaction + // is triggered every minute. This is to speed up the test. + select { + case db.compactc <- struct{}{}: + default: + } + time.Sleep(time.Millisecond) + } + duration := time.Since(start) + // Only waited for one offset: offset<=delay<<<2*offset + require.Greater(t, duration, db.opts.CompactionDelay) + require.Less(t, duration, 2*db.opts.CompactionDelay) + } + + compactAndCheck := func(db *DB) { + t.Helper() + start := time.Now() + db.Compact(context.Background()) + for db.head.compactable() { + time.Sleep(time.Millisecond) + } + if runtime.GOOS == "windows" { + // TODO: enable on windows once ms resolution timers are better supported. + return + } + duration := time.Since(start) + require.Less(t, duration, delay) + } + + cases := []struct { + name string + // The delays are chosen in such a way as to not slow down the tests, but also in a way to make the + // effective compaction duration negligible compared to them, so that the duration comparisons make sense. + compactionDelay time.Duration + }{ + { + "delayed compaction not enabled", + 0, + }, + { + "delayed compaction enabled", + delay, + }, + } + + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + t.Parallel() + + var options *Options + if c.compactionDelay > 0 { + options = &Options{CompactionDelay: c.compactionDelay} + } + db := openTestDB(t, options, []int64{10}) + defer func() { + require.NoError(t, db.Close()) + }() + + label := labels.FromStrings("foo", "bar") + + // The first compaction is expected to result in 1 block. + db.DisableCompactions() + app := db.Appender(context.Background()) + _, err := app.Append(0, label, 0, 0) + require.NoError(t, err) + _, err = app.Append(0, label, 11, 0) + require.NoError(t, err) + _, err = app.Append(0, label, 21, 0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + if c.compactionDelay == 0 { + // When delay is not enabled, compaction should run on the first trigger. + compactAndCheck(db) + } else { + db.EnableCompactions() + waitUntilCompactedAndCheck(db) + // The db.compactc signals have been processed multiple times since a compaction is triggered every 1ms by waitUntilCompacted. + // This implies that the compaction delay doesn't block or wait on the initial trigger. + // 3 is an arbitrary value because it's difficult to determine the precise value. + require.GreaterOrEqual(t, prom_testutil.ToFloat64(db.metrics.compactionsTriggered)-prom_testutil.ToFloat64(db.metrics.compactionsSkipped), 3.0) + // The delay doesn't change the head blocks alignement. + require.Eventually(t, func() bool { + return db.head.MinTime() == db.compactor.(*LeveledCompactor).ranges[0]+1 + }, 500*time.Millisecond, 10*time.Millisecond) + // One compaction was run and one block was produced. + require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)) + } + + // The second compaction is expected to result in 2 blocks. + // This ensures that the logic for compaction delay doesn't only work for the first compaction, but also takes into account the future compactions. + // This also ensures that no delay happens between consecutive compactions. + db.DisableCompactions() + app = db.Appender(context.Background()) + _, err = app.Append(0, label, 31, 0) + require.NoError(t, err) + _, err = app.Append(0, label, 41, 0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + if c.compactionDelay == 0 { + // Compaction should still run on the first trigger. + compactAndCheck(db) + } else { + db.EnableCompactions() + waitUntilCompactedAndCheck(db) + } + + // Two other compactions were run. + require.Eventually(t, func() bool { + return prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) == 3.0 + }, 500*time.Millisecond, 10*time.Millisecond) + + if c.compactionDelay == 0 { + return + } + + // This test covers a special case. If auto compaction is in a delay period and a manual compaction is triggered, + // auto compaction should stop waiting for the delay if the head is no longer compactable. + // Of course, if the head is still compactable after the manual compaction, auto compaction will continue waiting for the same delay. + getTimeWhenCompactionDelayStarted := func() time.Time { + t.Helper() + db.cmtx.Lock() + defer db.cmtx.Unlock() + return db.timeWhenCompactionDelayStarted + } + + db.DisableCompactions() + app = db.Appender(context.Background()) + _, err = app.Append(0, label, 51, 0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + require.True(t, db.head.compactable()) + db.EnableCompactions() + // Trigger an auto compaction. + db.compactc <- struct{}{} + // That made auto compaction start waiting for the delay. + require.Eventually(t, func() bool { + return !getTimeWhenCompactionDelayStarted().IsZero() + }, 100*time.Millisecond, 10*time.Millisecond) + // Trigger a manual compaction. + require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, 50.0))) + require.Equal(t, 4.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)) + // Re-trigger an auto compaction. + db.compactc <- struct{}{} + // That made auto compaction stop waiting for the delay. + require.Eventually(t, func() bool { + return getTimeWhenCompactionDelayStarted().IsZero() + }, 100*time.Millisecond, 10*time.Millisecond) + }) + } +} + +// TestDelayedCompactionDoesNotBlockUnrelatedOps makes sure that when delayed compaction is enabled, +// operations that don't directly derive from the Head compaction are not delayed, here we consider disk blocks compaction. +func TestDelayedCompactionDoesNotBlockUnrelatedOps(t *testing.T) { + cases := []struct { + name string + whenCompactable bool + }{ + { + "Head is compactable", + true, + }, + { + "Head is not compactable", + false, + }, + } + + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + t.Parallel() + + tmpdir := t.TempDir() + // Some blocks that need compation are present. + createBlock(t, tmpdir, genSeries(1, 1, 0, 100)) + createBlock(t, tmpdir, genSeries(1, 1, 100, 200)) + createBlock(t, tmpdir, genSeries(1, 1, 200, 300)) + + options := DefaultOptions() + // This will make the test timeout if compaction really waits for it. + options.CompactionDelay = time.Hour + db, err := open(tmpdir, log.NewNopLogger(), nil, options, []int64{10, 200}, nil) + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + + db.DisableCompactions() + require.Len(t, db.Blocks(), 3) + + if c.whenCompactable { + label := labels.FromStrings("foo", "bar") + app := db.Appender(context.Background()) + _, err := app.Append(0, label, 301, 0) + require.NoError(t, err) + _, err = app.Append(0, label, 317, 0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + // The Head is compactable and will still be at the end. + require.True(t, db.head.compactable()) + defer func() { + require.True(t, db.head.compactable()) + }() + } + + // The blocks were compacted. + db.Compact(context.Background()) + require.Len(t, db.Blocks(), 2) + }) + } +} diff --git a/tsdb/db.go b/tsdb/db.go index 87870a8472..3c73c892a9 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -21,6 +21,7 @@ import ( "io" "io/fs" "math" + "math/rand" "os" "path/filepath" "slices" @@ -84,6 +85,8 @@ func DefaultOptions() *Options { OutOfOrderCapMax: DefaultOutOfOrderCapMax, EnableOverlappingCompaction: true, EnableSharding: false, + EnableDelayedCompaction: false, + CompactionDelay: time.Duration(0), } } @@ -190,6 +193,13 @@ type Options struct { // EnableSharding enables query sharding support in TSDB. EnableSharding bool + // EnableDelayedCompaction, when set to true, assigns a random value to CompactionDelay during DB opening. + // When set to false, delayed compaction is disabled, unless CompactionDelay is set directly. + EnableDelayedCompaction bool + // CompactionDelay delays the start time of auto compactions. + // It can be increased by up to one minute if the DB does not commit too often. + CompactionDelay time.Duration + // NewCompactorFunc is a function that returns a TSDB compactor. NewCompactorFunc NewCompactorFunc @@ -246,6 +256,9 @@ type DB struct { // Cancel a running compaction when a shutdown is initiated. compactCancel context.CancelFunc + // timeWhenCompactionDelayStarted helps delay the compactions start time. + timeWhenCompactionDelayStarted time.Time + // oooWasEnabled is true if out of order support was enabled at least one time // during the time TSDB was up. In which case we need to keep supporting // out-of-order compaction and vertical queries. @@ -998,6 +1011,10 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs db.oooWasEnabled.Store(true) } + if opts.EnableDelayedCompaction { + opts.CompactionDelay = db.generateCompactionDelay() + } + go db.run(ctx) return db, nil @@ -1186,6 +1203,12 @@ func (a dbAppender) Commit() error { return err } +// waitingForCompactionDelay returns true if the DB is waiting for the Head compaction delay. +// This doesn't guarantee that the Head is really compactable. +func (db *DB) waitingForCompactionDelay() bool { + return time.Since(db.timeWhenCompactionDelayStarted) < db.opts.CompactionDelay +} + // Compact data if possible. After successful compaction blocks are reloaded // which will also delete the blocks that fall out of the retention window. // Old blocks are only deleted on reloadBlocks based on the new block's parent information. @@ -1219,7 +1242,21 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) { return nil default: } + if !db.head.compactable() { + // Reset the counter once the head compactions are done. + // This would also reset it if a manual compaction was triggered while the auto compaction was in its delay period. + if !db.timeWhenCompactionDelayStarted.IsZero() { + db.timeWhenCompactionDelayStarted = time.Time{} + } + break + } + + if db.timeWhenCompactionDelayStarted.IsZero() { + // Start counting for the delay. + db.timeWhenCompactionDelayStarted = time.Now() + } + if db.waitingForCompactionDelay() { break } mint := db.head.MinTime() @@ -1429,7 +1466,7 @@ func (db *DB) compactBlocks() (err error) { // If we have a lot of blocks to compact the whole process might take // long enough that we end up with a HEAD block that needs to be written. // Check if that's the case and stop compactions early. - if db.head.compactable() { + if db.head.compactable() && !db.waitingForCompactionDelay() { level.Warn(db.logger).Log("msg", "aborting block compactions to persit the head block") return nil } @@ -1932,6 +1969,11 @@ func (db *DB) EnableCompactions() { level.Info(db.logger).Log("msg", "Compactions enabled") } +func (db *DB) generateCompactionDelay() time.Duration { + // Up to 10% of the head's chunkRange. + return time.Duration(rand.Int63n(db.head.chunkRange.Load()/10)) * time.Millisecond +} + // ForceHeadMMap is intended for use only in tests and benchmarks. func (db *DB) ForceHeadMMap() { db.head.mmapHeadChunks() diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 781f360264..cf41e25f27 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -7357,3 +7357,25 @@ func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) { // Make sure only block-1 is queried. require.Equal(t, "block-1", lbls.Get("block")) } + +func TestGenerateCompactionDelay(t *testing.T) { + assertDelay := func(delay time.Duration) { + t.Helper() + require.GreaterOrEqual(t, delay, time.Duration(0)) + // Less than 10% of the chunkRange. + require.LessOrEqual(t, delay, 6000*time.Millisecond) + } + + opts := DefaultOptions() + opts.EnableDelayedCompaction = true + db := openTestDB(t, opts, []int64{60000}) + defer func() { + require.NoError(t, db.Close()) + }() + // The offset is generated and changed while opening. + assertDelay(db.opts.CompactionDelay) + + for i := 0; i < 1000; i++ { + assertDelay(db.generateCompactionDelay()) + } +}