mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 13:44:05 -08:00
Add support for smaller block chunk segment allocations (#8478)
* Add support for --storage.tsdb.max-chunk-size to suport small chunks for space limited prometheus instances. Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com> * Update tsdb/compact.go Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com> * Update tsdb/db.go Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com> * Update cmd/prometheus/main.go Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com> * Change naming scheme to Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com> * Add a lower bound to --storage.tsdb.max-block-chunk-segment-size Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com> * Update storage.md to explain what a chunk segment is Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com> * Apply suggestions from code review Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com> * Force tests Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com> * Fix code style Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
This commit is contained in:
parent
39d79c3cfb
commit
f9e2dd0697
|
@ -244,6 +244,10 @@ func main() {
|
|||
"Maximum duration compacted blocks may span. For use in testing. (Defaults to 10% of the retention period.)").
|
||||
Hidden().PlaceHolder("<duration>").SetValue(&cfg.tsdb.MaxBlockDuration)
|
||||
|
||||
a.Flag("storage.tsdb.max-block-chunk-segment-size",
|
||||
"The maximum size for a single chunk segment in a block. Example: 512MB").
|
||||
Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.tsdb.MaxBlockChunkSegmentSize)
|
||||
|
||||
a.Flag("storage.tsdb.wal-segment-size",
|
||||
"Size at which to split the tsdb WAL segment files. Example: 100MB").
|
||||
Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.tsdb.WALSegmentSize)
|
||||
|
@ -802,6 +806,11 @@ func main() {
|
|||
return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
|
||||
}
|
||||
}
|
||||
if cfg.tsdb.MaxBlockChunkSegmentSize != 0 {
|
||||
if cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 {
|
||||
return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB")
|
||||
}
|
||||
}
|
||||
db, err := openDBWithMetrics(
|
||||
cfg.localStoragePath,
|
||||
logger,
|
||||
|
@ -1216,30 +1225,32 @@ func (rm *readyScrapeManager) Get() (*scrape.Manager, error) {
|
|||
// tsdbOptions is tsdb.Option version with defined units.
|
||||
// This is required as tsdb.Option fields are unit agnostic (time).
|
||||
type tsdbOptions struct {
|
||||
WALSegmentSize units.Base2Bytes
|
||||
RetentionDuration model.Duration
|
||||
MaxBytes units.Base2Bytes
|
||||
NoLockfile bool
|
||||
AllowOverlappingBlocks bool
|
||||
WALCompression bool
|
||||
StripeSize int
|
||||
MinBlockDuration model.Duration
|
||||
MaxBlockDuration model.Duration
|
||||
MaxExemplars int
|
||||
WALSegmentSize units.Base2Bytes
|
||||
MaxBlockChunkSegmentSize units.Base2Bytes
|
||||
RetentionDuration model.Duration
|
||||
MaxBytes units.Base2Bytes
|
||||
NoLockfile bool
|
||||
AllowOverlappingBlocks bool
|
||||
WALCompression bool
|
||||
StripeSize int
|
||||
MinBlockDuration model.Duration
|
||||
MaxBlockDuration model.Duration
|
||||
MaxExemplars int
|
||||
}
|
||||
|
||||
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
||||
return tsdb.Options{
|
||||
WALSegmentSize: int(opts.WALSegmentSize),
|
||||
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
|
||||
MaxBytes: int64(opts.MaxBytes),
|
||||
NoLockfile: opts.NoLockfile,
|
||||
AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
|
||||
WALCompression: opts.WALCompression,
|
||||
StripeSize: opts.StripeSize,
|
||||
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
|
||||
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
|
||||
MaxExemplars: opts.MaxExemplars,
|
||||
WALSegmentSize: int(opts.WALSegmentSize),
|
||||
MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize),
|
||||
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
|
||||
MaxBytes: int64(opts.MaxBytes),
|
||||
NoLockfile: opts.NoLockfile,
|
||||
AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
|
||||
WALCompression: opts.WALCompression,
|
||||
StripeSize: opts.StripeSize,
|
||||
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
|
||||
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
|
||||
MaxExemplars: opts.MaxExemplars,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -238,6 +238,48 @@ func TestWALSegmentSizeBounds(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
||||
for size, expectedExitStatus := range map[string]int{"512KB": 1, "1MB": 0} {
|
||||
prom := exec.Command(promPath, "-test.main", "--storage.tsdb.max-block-chunk-segment-size="+size, "--config.file="+promConfig)
|
||||
|
||||
// Log stderr in case of failure.
|
||||
stderr, err := prom.StderrPipe()
|
||||
require.NoError(t, err)
|
||||
go func() {
|
||||
slurp, _ := ioutil.ReadAll(stderr)
|
||||
t.Log(string(slurp))
|
||||
}()
|
||||
|
||||
err = prom.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
if expectedExitStatus == 0 {
|
||||
done := make(chan error, 1)
|
||||
go func() { done <- prom.Wait() }()
|
||||
select {
|
||||
case err := <-done:
|
||||
t.Errorf("prometheus should be still running: %v", err)
|
||||
case <-time.After(5 * time.Second):
|
||||
prom.Process.Kill()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
err = prom.Wait()
|
||||
require.Error(t, err)
|
||||
if exitError, ok := err.(*exec.ExitError); ok {
|
||||
status := exitError.Sys().(syscall.WaitStatus)
|
||||
require.Equal(t, expectedExitStatus, status.ExitStatus())
|
||||
} else {
|
||||
t.Errorf("unable to retrieve the exit status for prometheus: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTimeMetrics(t *testing.T) {
|
||||
tmpDir, err := ioutil.TempDir("", "time_metrics_e2e")
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -13,7 +13,13 @@ Prometheus's local time series database stores data in a custom, highly efficien
|
|||
|
||||
### On-disk layout
|
||||
|
||||
Ingested samples are grouped into blocks of two hours. Each two-hour block consists of a directory containing one or more chunk files that contain all time series samples for that window of time, as well as a metadata file and index file (which indexes metric names and labels to time series in the chunk files). When series are deleted via the API, deletion records are stored in separate tombstone files (instead of deleting the data immediately from the chunk files).
|
||||
Ingested samples are grouped into blocks of two hours. Each two-hour block consists
|
||||
of a directory containing a chunks subdirectory containing all the time series samples
|
||||
for that window of time, a metadata file, and an index file (which indexes metric names
|
||||
and labels to time series in the chunks directory). The samples in the chunks directory
|
||||
are grouped together into one or more segment files of up to 512MB each by default. When series are
|
||||
deleted via the API, deletion records are stored in separate tombstone files (instead
|
||||
of deleting the data immediately from the chunk segments).
|
||||
|
||||
The current block for incoming samples is kept in memory and is not fully
|
||||
persisted. It is secured against crashes by a write-ahead log (WAL) that can be
|
||||
|
|
|
@ -76,11 +76,12 @@ type Compactor interface {
|
|||
|
||||
// LeveledCompactor implements the Compactor interface.
|
||||
type LeveledCompactor struct {
|
||||
metrics *compactorMetrics
|
||||
logger log.Logger
|
||||
ranges []int64
|
||||
chunkPool chunkenc.Pool
|
||||
ctx context.Context
|
||||
metrics *compactorMetrics
|
||||
logger log.Logger
|
||||
ranges []int64
|
||||
chunkPool chunkenc.Pool
|
||||
ctx context.Context
|
||||
maxBlockChunkSegmentSize int64
|
||||
}
|
||||
|
||||
type compactorMetrics struct {
|
||||
|
@ -145,6 +146,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
|||
|
||||
// NewLeveledCompactor returns a LeveledCompactor.
|
||||
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) {
|
||||
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize)
|
||||
}
|
||||
|
||||
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64) (*LeveledCompactor, error) {
|
||||
if len(ranges) == 0 {
|
||||
return nil, errors.Errorf("at least one range must be provided")
|
||||
}
|
||||
|
@ -155,11 +160,12 @@ func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Log
|
|||
l = log.NewNopLogger()
|
||||
}
|
||||
return &LeveledCompactor{
|
||||
ranges: ranges,
|
||||
chunkPool: pool,
|
||||
logger: l,
|
||||
metrics: newCompactorMetrics(r),
|
||||
ctx: ctx,
|
||||
ranges: ranges,
|
||||
chunkPool: pool,
|
||||
logger: l,
|
||||
metrics: newCompactorMetrics(r),
|
||||
ctx: ctx,
|
||||
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -561,7 +567,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
|||
// data of all blocks.
|
||||
var chunkw ChunkWriter
|
||||
|
||||
chunkw, err = chunks.NewWriter(chunkDir(tmp))
|
||||
chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open chunk writer")
|
||||
}
|
||||
|
|
11
tsdb/db.go
11
tsdb/db.go
|
@ -70,6 +70,7 @@ var (
|
|||
func DefaultOptions() *Options {
|
||||
return &Options{
|
||||
WALSegmentSize: wal.DefaultSegmentSize,
|
||||
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
|
||||
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
|
||||
MinBlockDuration: DefaultBlockDuration,
|
||||
MaxBlockDuration: DefaultBlockDuration,
|
||||
|
@ -89,6 +90,11 @@ type Options struct {
|
|||
// WALSegmentSize < 0, wal is disabled.
|
||||
WALSegmentSize int
|
||||
|
||||
// MaxBlockChunkSegmentSize is the max size of block chunk segment files.
|
||||
// MaxBlockChunkSegmentSize = 0, chunk segment size is default size.
|
||||
// MaxBlockChunkSegmentSize > 0, chunk segment size is MaxBlockChunkSegmentSize.
|
||||
MaxBlockChunkSegmentSize int64
|
||||
|
||||
// Duration of persisted data to keep.
|
||||
// Unit agnostic as long as unit is consistent with MinBlockDuration and MaxBlockDuration.
|
||||
// Typically it is in milliseconds.
|
||||
|
@ -550,6 +556,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
|
|||
if opts.HeadChunksWriteBufferSize <= 0 {
|
||||
opts.HeadChunksWriteBufferSize = chunks.DefaultWriteBufferSize
|
||||
}
|
||||
if opts.MaxBlockChunkSegmentSize <= 0 {
|
||||
opts.MaxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize
|
||||
}
|
||||
if opts.MinBlockDuration <= 0 {
|
||||
opts.MinBlockDuration = DefaultBlockDuration
|
||||
}
|
||||
|
@ -639,7 +648,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
|
||||
var err error
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
db.compactor, err = NewLeveledCompactor(ctx, r, l, rngs, db.chunkPool)
|
||||
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, errors.Wrap(err, "create leveled compactor")
|
||||
|
|
Loading…
Reference in a new issue