diff --git a/CHANGELOG.md b/CHANGELOG.md index ecf9e38a1..5f4584720 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## master / unreleased - + - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. + - [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block. - [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes` ## 0.4.0 diff --git a/block_test.go b/block_test.go index 68cad4b88..8a4081a33 100644 --- a/block_test.go +++ b/block_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "io/ioutil" "math/rand" "os" @@ -85,7 +86,7 @@ func createBlock(tb testing.TB, dir string, series []Series) string { err = app.Commit() testutil.Ok(tb, err) - compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{1000000}, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) testutil.Ok(tb, err) testutil.Ok(tb, os.MkdirAll(dir, 0777)) diff --git a/compact.go b/compact.go index 265cda0b8..5e2cb1a4d 100644 --- a/compact.go +++ b/compact.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "io" "math/rand" @@ -75,15 +76,17 @@ type LeveledCompactor struct { logger log.Logger ranges []int64 chunkPool chunkenc.Pool + ctx context.Context } type compactorMetrics struct { - ran prometheus.Counter - failed prometheus.Counter - duration prometheus.Histogram - chunkSize prometheus.Histogram - chunkSamples prometheus.Histogram - chunkRange prometheus.Histogram + ran prometheus.Counter + populatingBlocks prometheus.Gauge + failed prometheus.Counter + duration prometheus.Histogram + chunkSize prometheus.Histogram + chunkSamples prometheus.Histogram + chunkRange prometheus.Histogram } func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { @@ -93,6 +96,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { Name: "prometheus_tsdb_compactions_total", Help: "Total number of compactions that were executed for the partition.", }) + m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_compaction_populating_block", + Help: "Set to 1 when a block is currently being written to the disk.", + }) m.failed = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_failed_total", Help: "Total number of compactions that failed for the partition.", @@ -121,6 +128,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { if r != nil { r.MustRegister( m.ran, + m.populatingBlocks, m.failed, m.duration, m.chunkRange, @@ -132,7 +140,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } // NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } @@ -144,6 +152,7 @@ func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, chunkPool: pool, logger: l, metrics: newCompactorMetrics(r), + ctx: ctx, }, nil } @@ -402,10 +411,11 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u var merr MultiError merr.Add(err) - - for _, b := range bs { - if err := b.setCompactionFailed(); err != nil { - merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) + if err != context.Canceled { + for _, b := range bs { + if err := b.setCompactionFailed(); err != nil { + merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) + } } } @@ -475,14 +485,19 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + ".tmp" - + var closers []io.Closer defer func(t time.Time) { + var merr MultiError + merr.Add(err) + merr.Add(closeAll(closers)) + err = merr.Err() + + // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. + if err := os.RemoveAll(tmp); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } if err != nil { c.metrics.failed.Inc() - // TODO(gouthamve): Handle error how? - if err := os.RemoveAll(tmp); err != nil { - level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) - } } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) @@ -504,7 +519,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open chunk writer") } - defer chunkw.Close() + closers = append(closers, chunkw) // Record written chunk sizes on level 1 compactions. if meta.Compaction.Level == 1 { chunkw = &instrumentedChunkWriter{ @@ -519,27 +534,33 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open index writer") } - defer indexw.Close() + closers = append(closers, indexw) if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } + + select { + case <-c.ctx.Done(): + return c.ctx.Err() + default: + } + // We are explicitly closing them here to check for error even // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to // make sure they are closed if the function exits due to an error above. - if err = chunkw.Close(); err != nil { - return errors.Wrap(err, "close chunk writer") + var merr MultiError + for _, w := range closers { + merr.Add(w.Close()) } - if err = indexw.Close(); err != nil { - return errors.Wrap(err, "close index writer") + closers = closers[:0] // Avoid closing the writers twice in the defer. + if merr.Err() != nil { + return merr.Err() } - // Populated block is empty, so cleanup and exit. + // Populated block is empty, so exit early. if meta.Stats.NumSamples == 0 { - if err := os.RemoveAll(tmp); err != nil { - return errors.Wrap(err, "remove tmp folder after empty block failed") - } return nil } @@ -597,9 +618,17 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, merr.Add(err) merr.Add(closeAll(closers)) err = merr.Err() + c.metrics.populatingBlocks.Set(0) }() + c.metrics.populatingBlocks.Set(1) for i, b := range blocks { + select { + case <-c.ctx.Done(): + return c.ctx.Err() + default: + } + indexr, err := b.Index() if err != nil { return errors.Wrapf(err, "open index reader for block %s", b) @@ -656,6 +685,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for set.Next() { + select { + case <-c.ctx.Done(): + return c.ctx.Err() + default: + } + lset, chks, dranges := set.At() // The chunks here are not fully deleted. // Skip the series with all deleted chunks. diff --git a/compact_test.go b/compact_test.go index d92efab1c..a755d69f2 100644 --- a/compact_test.go +++ b/compact_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "io/ioutil" "math" "os" @@ -27,6 +28,7 @@ import ( prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" ) @@ -153,7 +155,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c, err := NewLeveledCompactor(nil, nil, []int64{50}, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil) testutil.Ok(t, err) c.plan(metas) @@ -161,7 +163,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { func TestLeveledCompactor_plan(t *testing.T) { // This mimicks our default ExponentialBlockRanges with min block size equals to 20. - compactor, err := NewLeveledCompactor(nil, nil, []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ 20, 60, 180, @@ -324,7 +326,7 @@ func TestLeveledCompactor_plan(t *testing.T) { } func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { - compactor, err := NewLeveledCompactor(nil, nil, []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ 20, 60, 240, @@ -374,7 +376,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { } func TestCompactionFailWillCleanUpTempDir(t *testing.T) { - compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{ 20, 60, 240, @@ -649,7 +651,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr}) } - c, err := NewLeveledCompactor(nil, nil, []int64{0}, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil) testutil.Ok(t, err) meta := &BlockMeta{ @@ -746,6 +748,70 @@ func TestDisableAutoCompactions(t *testing.T) { testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") } +// TestCancelCompactions ensures that when the db is closed +// any running compaction is cancelled to unblock closing the db. +func TestCancelCompactions(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "testCancelCompaction") + testutil.Ok(t, err) + defer os.RemoveAll(tmpdir) + + // Create some blocks to fall within the compaction range. + createBlock(t, tmpdir, genSeries(10, 10000, 0, 1000)) + createBlock(t, tmpdir, genSeries(10, 10000, 1000, 2000)) + createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one. + + // Copy the db so we have an exact copy to compare compaction times. + tmpdirCopy := tmpdir + "Copy" + err = fileutil.CopyDirs(tmpdir, tmpdirCopy) + testutil.Ok(t, err) + defer os.RemoveAll(tmpdirCopy) + + // Measure the compaction time without interupting it. + var timeCompactionUninterrupted time.Duration + { + db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) + testutil.Ok(t, err) + testutil.Equals(t, 3, len(db.Blocks()), "initial block count mismatch") + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") + db.compactc <- struct{}{} // Trigger a compaction. + var start time.Time + for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 { + time.Sleep(3 * time.Millisecond) + } + start = time.Now() + + for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) != 1 { + time.Sleep(3 * time.Millisecond) + } + timeCompactionUninterrupted = time.Since(start) + + testutil.Ok(t, db.Close()) + } + // Measure the compaction time when closing the db in the middle of compaction. + { + db, err := Open(tmpdirCopy, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) + testutil.Ok(t, err) + testutil.Equals(t, 3, len(db.Blocks()), "initial block count mismatch") + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") + db.compactc <- struct{}{} // Trigger a compaction. + dbClosed := make(chan struct{}) + + for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 { + time.Sleep(3 * time.Millisecond) + } + go func() { + testutil.Ok(t, db.Close()) + close(dbClosed) + }() + + start := time.Now() + <-dbClosed + actT := time.Since(start) + expT := time.Duration(timeCompactionUninterrupted / 2) // Closing the db in the middle of compaction should less than half the time. + testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) + } +} + // TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload immediately after a compaction // deletes the resulting block to avoid creatings blocks with the same time range. func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { diff --git a/db.go b/db.go index e8fd9c0aa..8307c0827 100644 --- a/db.go +++ b/db.go @@ -16,6 +16,7 @@ package tsdb import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -126,6 +127,9 @@ type DB struct { // changing the autoCompact var. autoCompactMtx sync.Mutex autoCompact bool + + // Cancel a running compaction when a shutdown is initiated. + compactCancel context.CancelFunc } type dbMetrics struct { @@ -271,10 +275,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = lockf } - db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool) + ctx, cancel := context.WithCancel(context.Background()) + db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool) if err != nil { + cancel() return nil, errors.Wrap(err, "create leveled compactor") } + db.compactCancel = cancel segmentSize := wal.DefaultSegmentSize if opts.WALSegmentSize > 0 { @@ -826,6 +833,7 @@ func (db *DB) Head() *Head { // Close the partition. func (db *DB) Close() error { close(db.stopc) + db.compactCancel() <-db.donec db.mtx.Lock()