feat: allow to delay head compaction start time helping Prometheus instances to

avoid simultaneous compactions and reduce stress on shared resources.

This is enabled via `--enable-feature=delayed-compaction`.

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
machine424 2024-04-08 14:59:30 +02:00 committed by Ayoub Mrini
parent 0833d2a230
commit 92873d3009
6 changed files with 313 additions and 3 deletions

View file

@ -234,6 +234,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
level.Info(logger).Log("msg", "Experimental created timestamp zero ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
case "delayed-compaction":
c.tsdb.EnableDelayedCompaction = true
level.Info(logger).Log("msg", "Experimental delayed compaction is enabled.")
case "":
continue
case "promql-at-modifier", "promql-negative-offset":
@ -475,7 +478,7 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
promlogflag.AddFlags(a, &cfg.promlogConfig)
@ -1715,6 +1718,7 @@ type tsdbOptions struct {
MaxExemplars int64
EnableMemorySnapshotOnShutdown bool
EnableNativeHistograms bool
EnableDelayedCompaction bool
}
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
@ -1736,6 +1740,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
EnableNativeHistograms: opts.EnableNativeHistograms,
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
EnableOverlappingCompaction: true,
EnableDelayedCompaction: opts.EnableDelayedCompaction,
}
}

View file

@ -56,7 +56,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |

View file

@ -234,3 +234,17 @@ metadata changes as WAL records on a per-series basis.
This must be used if
you are also using remote write 2.0 as it will only gather metadata from the WAL.
## Delay compaction start time
`--enable-feature=delayed-compaction`
A random offset, up to `10%` of the chunk range, is added to the Head compaction start time. This assists Prometheus instances in avoiding simultaneous compactions and reduces the load on shared resources.
Only auto Head compactions and the operations directly resulting from them are subject to this delay.
In the event of multiple consecutive Head compactions being possible, only the first compaction experiences this delay.
Note that during this delay, the Head continues its usual operations, which include serving and appending series.
Despite the delay in compaction, the blocks produced are time-aligned in the same manner as they would be if the delay was not in place.

View file

@ -22,6 +22,7 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"sync"
"testing"
@ -1925,3 +1926,229 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) {
require.Nil(t, ulids)
require.NoError(t, block.Close())
}
func TestDelayedCompaction(t *testing.T) {
// The delay is chosen in such a way as to not slow down the tests, but also to make
// the effective compaction duration negligible compared to it, so that the duration comparisons make sense.
delay := 1000 * time.Millisecond
waitUntilCompactedAndCheck := func(db *DB) {
t.Helper()
start := time.Now()
for db.head.compactable() {
// This simulates what happens at the end of commits, for less busy DB, a compaction
// is triggered every minute. This is to speed up the test.
select {
case db.compactc <- struct{}{}:
default:
}
time.Sleep(time.Millisecond)
}
duration := time.Since(start)
// Only waited for one offset: offset<=delay<<<2*offset
require.Greater(t, duration, db.opts.CompactionDelay)
require.Less(t, duration, 2*db.opts.CompactionDelay)
}
compactAndCheck := func(db *DB) {
t.Helper()
start := time.Now()
db.Compact(context.Background())
for db.head.compactable() {
time.Sleep(time.Millisecond)
}
if runtime.GOOS == "windows" {
// TODO: enable on windows once ms resolution timers are better supported.
return
}
duration := time.Since(start)
require.Less(t, duration, delay)
}
cases := []struct {
name string
// The delays are chosen in such a way as to not slow down the tests, but also in a way to make the
// effective compaction duration negligible compared to them, so that the duration comparisons make sense.
compactionDelay time.Duration
}{
{
"delayed compaction not enabled",
0,
},
{
"delayed compaction enabled",
delay,
},
}
for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
t.Parallel()
var options *Options
if c.compactionDelay > 0 {
options = &Options{CompactionDelay: c.compactionDelay}
}
db := openTestDB(t, options, []int64{10})
defer func() {
require.NoError(t, db.Close())
}()
label := labels.FromStrings("foo", "bar")
// The first compaction is expected to result in 1 block.
db.DisableCompactions()
app := db.Appender(context.Background())
_, err := app.Append(0, label, 0, 0)
require.NoError(t, err)
_, err = app.Append(0, label, 11, 0)
require.NoError(t, err)
_, err = app.Append(0, label, 21, 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
if c.compactionDelay == 0 {
// When delay is not enabled, compaction should run on the first trigger.
compactAndCheck(db)
} else {
db.EnableCompactions()
waitUntilCompactedAndCheck(db)
// The db.compactc signals have been processed multiple times since a compaction is triggered every 1ms by waitUntilCompacted.
// This implies that the compaction delay doesn't block or wait on the initial trigger.
// 3 is an arbitrary value because it's difficult to determine the precise value.
require.GreaterOrEqual(t, prom_testutil.ToFloat64(db.metrics.compactionsTriggered)-prom_testutil.ToFloat64(db.metrics.compactionsSkipped), 3.0)
// The delay doesn't change the head blocks alignement.
require.Eventually(t, func() bool {
return db.head.MinTime() == db.compactor.(*LeveledCompactor).ranges[0]+1
}, 500*time.Millisecond, 10*time.Millisecond)
// One compaction was run and one block was produced.
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran))
}
// The second compaction is expected to result in 2 blocks.
// This ensures that the logic for compaction delay doesn't only work for the first compaction, but also takes into account the future compactions.
// This also ensures that no delay happens between consecutive compactions.
db.DisableCompactions()
app = db.Appender(context.Background())
_, err = app.Append(0, label, 31, 0)
require.NoError(t, err)
_, err = app.Append(0, label, 41, 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
if c.compactionDelay == 0 {
// Compaction should still run on the first trigger.
compactAndCheck(db)
} else {
db.EnableCompactions()
waitUntilCompactedAndCheck(db)
}
// Two other compactions were run.
require.Eventually(t, func() bool {
return prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) == 3.0
}, 500*time.Millisecond, 10*time.Millisecond)
if c.compactionDelay == 0 {
return
}
// This test covers a special case. If auto compaction is in a delay period and a manual compaction is triggered,
// auto compaction should stop waiting for the delay if the head is no longer compactable.
// Of course, if the head is still compactable after the manual compaction, auto compaction will continue waiting for the same delay.
getTimeWhenCompactionDelayStarted := func() time.Time {
t.Helper()
db.cmtx.Lock()
defer db.cmtx.Unlock()
return db.timeWhenCompactionDelayStarted
}
db.DisableCompactions()
app = db.Appender(context.Background())
_, err = app.Append(0, label, 51, 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
require.True(t, db.head.compactable())
db.EnableCompactions()
// Trigger an auto compaction.
db.compactc <- struct{}{}
// That made auto compaction start waiting for the delay.
require.Eventually(t, func() bool {
return !getTimeWhenCompactionDelayStarted().IsZero()
}, 100*time.Millisecond, 10*time.Millisecond)
// Trigger a manual compaction.
require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, 50.0)))
require.Equal(t, 4.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran))
// Re-trigger an auto compaction.
db.compactc <- struct{}{}
// That made auto compaction stop waiting for the delay.
require.Eventually(t, func() bool {
return getTimeWhenCompactionDelayStarted().IsZero()
}, 100*time.Millisecond, 10*time.Millisecond)
})
}
}
// TestDelayedCompactionDoesNotBlockUnrelatedOps makes sure that when delayed compaction is enabled,
// operations that don't directly derive from the Head compaction are not delayed, here we consider disk blocks compaction.
func TestDelayedCompactionDoesNotBlockUnrelatedOps(t *testing.T) {
cases := []struct {
name string
whenCompactable bool
}{
{
"Head is compactable",
true,
},
{
"Head is not compactable",
false,
},
}
for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
t.Parallel()
tmpdir := t.TempDir()
// Some blocks that need compation are present.
createBlock(t, tmpdir, genSeries(1, 1, 0, 100))
createBlock(t, tmpdir, genSeries(1, 1, 100, 200))
createBlock(t, tmpdir, genSeries(1, 1, 200, 300))
options := DefaultOptions()
// This will make the test timeout if compaction really waits for it.
options.CompactionDelay = time.Hour
db, err := open(tmpdir, log.NewNopLogger(), nil, options, []int64{10, 200}, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
db.DisableCompactions()
require.Len(t, db.Blocks(), 3)
if c.whenCompactable {
label := labels.FromStrings("foo", "bar")
app := db.Appender(context.Background())
_, err := app.Append(0, label, 301, 0)
require.NoError(t, err)
_, err = app.Append(0, label, 317, 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
// The Head is compactable and will still be at the end.
require.True(t, db.head.compactable())
defer func() {
require.True(t, db.head.compactable())
}()
}
// The blocks were compacted.
db.Compact(context.Background())
require.Len(t, db.Blocks(), 2)
})
}
}

View file

@ -21,6 +21,7 @@ import (
"io"
"io/fs"
"math"
"math/rand"
"os"
"path/filepath"
"slices"
@ -84,6 +85,8 @@ func DefaultOptions() *Options {
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
EnableOverlappingCompaction: true,
EnableSharding: false,
EnableDelayedCompaction: false,
CompactionDelay: time.Duration(0),
}
}
@ -190,6 +193,13 @@ type Options struct {
// EnableSharding enables query sharding support in TSDB.
EnableSharding bool
// EnableDelayedCompaction, when set to true, assigns a random value to CompactionDelay during DB opening.
// When set to false, delayed compaction is disabled, unless CompactionDelay is set directly.
EnableDelayedCompaction bool
// CompactionDelay delays the start time of auto compactions.
// It can be increased by up to one minute if the DB does not commit too often.
CompactionDelay time.Duration
// NewCompactorFunc is a function that returns a TSDB compactor.
NewCompactorFunc NewCompactorFunc
@ -246,6 +256,9 @@ type DB struct {
// Cancel a running compaction when a shutdown is initiated.
compactCancel context.CancelFunc
// timeWhenCompactionDelayStarted helps delay the compactions start time.
timeWhenCompactionDelayStarted time.Time
// oooWasEnabled is true if out of order support was enabled at least one time
// during the time TSDB was up. In which case we need to keep supporting
// out-of-order compaction and vertical queries.
@ -998,6 +1011,10 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
db.oooWasEnabled.Store(true)
}
if opts.EnableDelayedCompaction {
opts.CompactionDelay = db.generateCompactionDelay()
}
go db.run(ctx)
return db, nil
@ -1186,6 +1203,12 @@ func (a dbAppender) Commit() error {
return err
}
// waitingForCompactionDelay returns true if the DB is waiting for the Head compaction delay.
// This doesn't guarantee that the Head is really compactable.
func (db *DB) waitingForCompactionDelay() bool {
return time.Since(db.timeWhenCompactionDelayStarted) < db.opts.CompactionDelay
}
// Compact data if possible. After successful compaction blocks are reloaded
// which will also delete the blocks that fall out of the retention window.
// Old blocks are only deleted on reloadBlocks based on the new block's parent information.
@ -1219,7 +1242,21 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) {
return nil
default:
}
if !db.head.compactable() {
// Reset the counter once the head compactions are done.
// This would also reset it if a manual compaction was triggered while the auto compaction was in its delay period.
if !db.timeWhenCompactionDelayStarted.IsZero() {
db.timeWhenCompactionDelayStarted = time.Time{}
}
break
}
if db.timeWhenCompactionDelayStarted.IsZero() {
// Start counting for the delay.
db.timeWhenCompactionDelayStarted = time.Now()
}
if db.waitingForCompactionDelay() {
break
}
mint := db.head.MinTime()
@ -1429,7 +1466,7 @@ func (db *DB) compactBlocks() (err error) {
// If we have a lot of blocks to compact the whole process might take
// long enough that we end up with a HEAD block that needs to be written.
// Check if that's the case and stop compactions early.
if db.head.compactable() {
if db.head.compactable() && !db.waitingForCompactionDelay() {
level.Warn(db.logger).Log("msg", "aborting block compactions to persit the head block")
return nil
}
@ -1932,6 +1969,11 @@ func (db *DB) EnableCompactions() {
level.Info(db.logger).Log("msg", "Compactions enabled")
}
func (db *DB) generateCompactionDelay() time.Duration {
// Up to 10% of the head's chunkRange.
return time.Duration(rand.Int63n(db.head.chunkRange.Load()/10)) * time.Millisecond
}
// ForceHeadMMap is intended for use only in tests and benchmarks.
func (db *DB) ForceHeadMMap() {
db.head.mmapHeadChunks()

View file

@ -7357,3 +7357,25 @@ func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) {
// Make sure only block-1 is queried.
require.Equal(t, "block-1", lbls.Get("block"))
}
func TestGenerateCompactionDelay(t *testing.T) {
assertDelay := func(delay time.Duration) {
t.Helper()
require.GreaterOrEqual(t, delay, time.Duration(0))
// Less than 10% of the chunkRange.
require.LessOrEqual(t, delay, 6000*time.Millisecond)
}
opts := DefaultOptions()
opts.EnableDelayedCompaction = true
db := openTestDB(t, opts, []int64{60000})
defer func() {
require.NoError(t, db.Close())
}()
// The offset is generated and changed while opening.
assertDelay(db.opts.CompactionDelay)
for i := 0; i < 1000; i++ {
assertDelay(db.generateCompactionDelay())
}
}