From f9e2dd069758f70957e5a447d8245865f35b78b0 Mon Sep 17 00:00:00 2001 From: nberkley Date: Thu, 15 Apr 2021 04:55:01 -0400 Subject: [PATCH] Add support for smaller block chunk segment allocations (#8478) * Add support for --storage.tsdb.max-chunk-size to suport small chunks for space limited prometheus instances. Signed-off-by: Nathan Berkley * Update tsdb/compact.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Nathan Berkley * Update tsdb/db.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Nathan Berkley * Update cmd/prometheus/main.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Nathan Berkley * Change naming scheme to Signed-off-by: Nathan Berkley * Add a lower bound to --storage.tsdb.max-block-chunk-segment-size Signed-off-by: Nathan Berkley * Update storage.md to explain what a chunk segment is Signed-off-by: Nathan Berkley * Apply suggestions from code review Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Signed-off-by: Nathan Berkley * Force tests Signed-off-by: Nathan Berkley * Fix code style Signed-off-by: Nathan Berkley Co-authored-by: Bartlomiej Plotka Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> --- cmd/prometheus/main.go | 51 ++++++++++++++++++++++--------------- cmd/prometheus/main_test.go | 42 ++++++++++++++++++++++++++++++ docs/storage.md | 8 +++++- tsdb/compact.go | 28 ++++++++++++-------- tsdb/db.go | 11 +++++++- 5 files changed, 107 insertions(+), 33 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 2fa828a43..0492fbe15 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -244,6 +244,10 @@ func main() { "Maximum duration compacted blocks may span. For use in testing. (Defaults to 10% of the retention period.)"). Hidden().PlaceHolder("").SetValue(&cfg.tsdb.MaxBlockDuration) + a.Flag("storage.tsdb.max-block-chunk-segment-size", + "The maximum size for a single chunk segment in a block. Example: 512MB"). + Hidden().PlaceHolder("").BytesVar(&cfg.tsdb.MaxBlockChunkSegmentSize) + a.Flag("storage.tsdb.wal-segment-size", "Size at which to split the tsdb WAL segment files. Example: 100MB"). Hidden().PlaceHolder("").BytesVar(&cfg.tsdb.WALSegmentSize) @@ -802,6 +806,11 @@ func main() { return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB") } } + if cfg.tsdb.MaxBlockChunkSegmentSize != 0 { + if cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 { + return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB") + } + } db, err := openDBWithMetrics( cfg.localStoragePath, logger, @@ -1216,30 +1225,32 @@ func (rm *readyScrapeManager) Get() (*scrape.Manager, error) { // tsdbOptions is tsdb.Option version with defined units. // This is required as tsdb.Option fields are unit agnostic (time). type tsdbOptions struct { - WALSegmentSize units.Base2Bytes - RetentionDuration model.Duration - MaxBytes units.Base2Bytes - NoLockfile bool - AllowOverlappingBlocks bool - WALCompression bool - StripeSize int - MinBlockDuration model.Duration - MaxBlockDuration model.Duration - MaxExemplars int + WALSegmentSize units.Base2Bytes + MaxBlockChunkSegmentSize units.Base2Bytes + RetentionDuration model.Duration + MaxBytes units.Base2Bytes + NoLockfile bool + AllowOverlappingBlocks bool + WALCompression bool + StripeSize int + MinBlockDuration model.Duration + MaxBlockDuration model.Duration + MaxExemplars int } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { return tsdb.Options{ - WALSegmentSize: int(opts.WALSegmentSize), - RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), - MaxBytes: int64(opts.MaxBytes), - NoLockfile: opts.NoLockfile, - AllowOverlappingBlocks: opts.AllowOverlappingBlocks, - WALCompression: opts.WALCompression, - StripeSize: opts.StripeSize, - MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), - MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), - MaxExemplars: opts.MaxExemplars, + WALSegmentSize: int(opts.WALSegmentSize), + MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize), + RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), + MaxBytes: int64(opts.MaxBytes), + NoLockfile: opts.NoLockfile, + AllowOverlappingBlocks: opts.AllowOverlappingBlocks, + WALCompression: opts.WALCompression, + StripeSize: opts.StripeSize, + MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), + MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), + MaxExemplars: opts.MaxExemplars, } } diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index 7571234c4..8cb545d1e 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -238,6 +238,48 @@ func TestWALSegmentSizeBounds(t *testing.T) { } } +func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + for size, expectedExitStatus := range map[string]int{"512KB": 1, "1MB": 0} { + prom := exec.Command(promPath, "-test.main", "--storage.tsdb.max-block-chunk-segment-size="+size, "--config.file="+promConfig) + + // Log stderr in case of failure. + stderr, err := prom.StderrPipe() + require.NoError(t, err) + go func() { + slurp, _ := ioutil.ReadAll(stderr) + t.Log(string(slurp)) + }() + + err = prom.Start() + require.NoError(t, err) + + if expectedExitStatus == 0 { + done := make(chan error, 1) + go func() { done <- prom.Wait() }() + select { + case err := <-done: + t.Errorf("prometheus should be still running: %v", err) + case <-time.After(5 * time.Second): + prom.Process.Kill() + } + continue + } + + err = prom.Wait() + require.Error(t, err) + if exitError, ok := err.(*exec.ExitError); ok { + status := exitError.Sys().(syscall.WaitStatus) + require.Equal(t, expectedExitStatus, status.ExitStatus()) + } else { + t.Errorf("unable to retrieve the exit status for prometheus: %v", err) + } + } +} + func TestTimeMetrics(t *testing.T) { tmpDir, err := ioutil.TempDir("", "time_metrics_e2e") require.NoError(t, err) diff --git a/docs/storage.md b/docs/storage.md index 3d32f58be..1b4f6ebca 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -13,7 +13,13 @@ Prometheus's local time series database stores data in a custom, highly efficien ### On-disk layout -Ingested samples are grouped into blocks of two hours. Each two-hour block consists of a directory containing one or more chunk files that contain all time series samples for that window of time, as well as a metadata file and index file (which indexes metric names and labels to time series in the chunk files). When series are deleted via the API, deletion records are stored in separate tombstone files (instead of deleting the data immediately from the chunk files). +Ingested samples are grouped into blocks of two hours. Each two-hour block consists +of a directory containing a chunks subdirectory containing all the time series samples +for that window of time, a metadata file, and an index file (which indexes metric names +and labels to time series in the chunks directory). The samples in the chunks directory +are grouped together into one or more segment files of up to 512MB each by default. When series are +deleted via the API, deletion records are stored in separate tombstone files (instead +of deleting the data immediately from the chunk segments). The current block for incoming samples is kept in memory and is not fully persisted. It is secured against crashes by a write-ahead log (WAL) that can be diff --git a/tsdb/compact.go b/tsdb/compact.go index 747f08f9f..9befdc552 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -76,11 +76,12 @@ type Compactor interface { // LeveledCompactor implements the Compactor interface. type LeveledCompactor struct { - metrics *compactorMetrics - logger log.Logger - ranges []int64 - chunkPool chunkenc.Pool - ctx context.Context + metrics *compactorMetrics + logger log.Logger + ranges []int64 + chunkPool chunkenc.Pool + ctx context.Context + maxBlockChunkSegmentSize int64 } type compactorMetrics struct { @@ -145,6 +146,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { // NewLeveledCompactor returns a LeveledCompactor. func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { + return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize) +} + +func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } @@ -155,11 +160,12 @@ func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Log l = log.NewNopLogger() } return &LeveledCompactor{ - ranges: ranges, - chunkPool: pool, - logger: l, - metrics: newCompactorMetrics(r), - ctx: ctx, + ranges: ranges, + chunkPool: pool, + logger: l, + metrics: newCompactorMetrics(r), + ctx: ctx, + maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, }, nil } @@ -561,7 +567,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // data of all blocks. var chunkw ChunkWriter - chunkw, err = chunks.NewWriter(chunkDir(tmp)) + chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize) if err != nil { return errors.Wrap(err, "open chunk writer") } diff --git a/tsdb/db.go b/tsdb/db.go index d4fdc532d..d6036f5e5 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -70,6 +70,7 @@ var ( func DefaultOptions() *Options { return &Options{ WALSegmentSize: wal.DefaultSegmentSize, + MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), MinBlockDuration: DefaultBlockDuration, MaxBlockDuration: DefaultBlockDuration, @@ -89,6 +90,11 @@ type Options struct { // WALSegmentSize < 0, wal is disabled. WALSegmentSize int + // MaxBlockChunkSegmentSize is the max size of block chunk segment files. + // MaxBlockChunkSegmentSize = 0, chunk segment size is default size. + // MaxBlockChunkSegmentSize > 0, chunk segment size is MaxBlockChunkSegmentSize. + MaxBlockChunkSegmentSize int64 + // Duration of persisted data to keep. // Unit agnostic as long as unit is consistent with MinBlockDuration and MaxBlockDuration. // Typically it is in milliseconds. @@ -550,6 +556,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { if opts.HeadChunksWriteBufferSize <= 0 { opts.HeadChunksWriteBufferSize = chunks.DefaultWriteBufferSize } + if opts.MaxBlockChunkSegmentSize <= 0 { + opts.MaxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize + } if opts.MinBlockDuration <= 0 { opts.MinBlockDuration = DefaultBlockDuration } @@ -639,7 +648,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs var err error ctx, cancel := context.WithCancel(context.Background()) - db.compactor, err = NewLeveledCompactor(ctx, r, l, rngs, db.chunkPool) + db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize) if err != nil { cancel() return nil, errors.Wrap(err, "create leveled compactor")