diff --git a/CHANGELOG.md b/CHANGELOG.md index d220de4ce..6fec65fb2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ ## master / unreleased + - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. + ## 0.3.0 diff --git a/block_test.go b/block_test.go index 03ac006a9..661898b7f 100644 --- a/block_test.go +++ b/block_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "io/ioutil" "math/rand" "os" @@ -105,7 +106,7 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int testutil.Ok(tb, err) } - compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{1000000}, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) testutil.Ok(tb, err) testutil.Ok(tb, os.MkdirAll(dir, 0777)) diff --git a/compact.go b/compact.go index f8e6ff545..37fa3ae00 100644 --- a/compact.go +++ b/compact.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "io" "math/rand" @@ -71,6 +72,7 @@ type LeveledCompactor struct { logger log.Logger ranges []int64 chunkPool chunkenc.Pool + ctx context.Context } type compactorMetrics struct { @@ -128,7 +130,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } // NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } @@ -140,6 +142,7 @@ func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, chunkPool: pool, logger: l, metrics: newCompactorMetrics(r), + ctx: ctx, }, nil } @@ -441,8 +444,11 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + ".tmp" - + var writers []io.Closer defer func(t time.Time) { + for _, w := range writers { + w.Close() + } if err != nil { c.metrics.failed.Inc() // TODO(gouthamve): Handle error how? @@ -470,7 +476,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open chunk writer") } - defer chunkw.Close() + writers = append(writers, chunkw) // Record written chunk sizes on level 1 compactions. if meta.Compaction.Level == 1 { chunkw = &instrumentedChunkWriter{ @@ -485,12 +491,25 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open index writer") } - defer indexw.Close() + writers = append(writers, indexw) if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } + // Remove tmp folder and return early when the compaction was canceled. + select { + case <-c.ctx.Done(): + for _, w := range writers { + w.Close() + } + if err := os.RemoveAll(tmp); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after canceled compaction", "err", err.Error()) + } + return + default: + } + if err = writeMetaFile(tmp, meta); err != nil { return errors.Wrap(err, "write merged meta") } @@ -499,11 +518,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to // make sure they are closed if the function exits due to an error above. - if err = chunkw.Close(); err != nil { - return errors.Wrap(err, "close chunk writer") - } - if err = indexw.Close(); err != nil { - return errors.Wrap(err, "close index writer") + for _, w := range writers { + if err := w.Close(); err != nil { + return err + } } // Create an empty tombstones file. @@ -554,6 +572,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, defer func() { closeAll(closers...) }() for i, b := range blocks { + select { + case <-c.ctx.Done(): + return nil + default: + } + indexr, err := b.Index() if err != nil { return errors.Wrapf(err, "open index reader for block %s", b) @@ -610,6 +634,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for set.Next() { + select { + case <-c.ctx.Done(): + return nil + default: + } lset, chks, dranges := set.At() // The chunks here are not fully deleted. // Skip the series with all deleted chunks. diff --git a/compact_test.go b/compact_test.go index 2489a21e1..89c36767f 100644 --- a/compact_test.go +++ b/compact_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "io/ioutil" "math" "os" @@ -151,7 +152,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c, err := NewLeveledCompactor(nil, nil, []int64{50}, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil) testutil.Ok(t, err) c.plan(metas) @@ -159,7 +160,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { func TestLeveledCompactor_plan(t *testing.T) { // This mimicks our default ExponentialBlockRanges with min block size equals to 20. - compactor, err := NewLeveledCompactor(nil, nil, []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ 20, 60, 180, @@ -322,7 +323,7 @@ func TestLeveledCompactor_plan(t *testing.T) { } func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { - compactor, err := NewLeveledCompactor(nil, nil, []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ 20, 60, 240, @@ -372,7 +373,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { } func TestCompactionFailWillCleanUpTempDir(t *testing.T) { - compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{ 20, 60, 240, @@ -647,7 +648,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr}) } - c, err := NewLeveledCompactor(nil, nil, []int64{0}, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil) testutil.Ok(t, err) meta := &BlockMeta{ diff --git a/db.go b/db.go index 3a47f0bf4..ac9c832b9 100644 --- a/db.go +++ b/db.go @@ -16,6 +16,7 @@ package tsdb import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -119,6 +120,9 @@ type DB struct { // changing the autoCompact var. autoCompactMtx sync.Mutex autoCompact bool + + // Cancel a running compaction when a shutdown is initiated. + compactCnl func() } type dbMetrics struct { @@ -258,10 +262,12 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = lockf } - db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool) + ctx, cnl := context.WithCancel(context.Background()) + db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool) if err != nil { return nil, errors.Wrap(err, "create leveled compactor") } + db.compactCnl = cnl wlog, err := wal.New(l, r, filepath.Join(dir, "wal")) if err != nil { @@ -726,6 +732,7 @@ func (db *DB) Head() *Head { // Close the partition. func (db *DB) Close() error { close(db.stopc) + db.compactCnl() <-db.donec db.mtx.Lock()