use context to cancel compactions

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2018-12-05 18:34:42 +02:00
parent c8c03ff85b
commit bd5ccee5c1
5 changed files with 56 additions and 16 deletions

View file

@ -1,4 +1,6 @@
## master / unreleased ## master / unreleased
- [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db.
## 0.3.0 ## 0.3.0

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"context"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
@ -105,7 +106,7 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int
testutil.Ok(tb, err) testutil.Ok(tb, err)
} }
compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{1000000}, nil) compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil)
testutil.Ok(tb, err) testutil.Ok(tb, err)
testutil.Ok(tb, os.MkdirAll(dir, 0777)) testutil.Ok(tb, os.MkdirAll(dir, 0777))

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
@ -71,6 +72,7 @@ type LeveledCompactor struct {
logger log.Logger logger log.Logger
ranges []int64 ranges []int64
chunkPool chunkenc.Pool chunkPool chunkenc.Pool
ctx context.Context
} }
type compactorMetrics struct { type compactorMetrics struct {
@ -128,7 +130,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
} }
// NewLeveledCompactor returns a LeveledCompactor. // NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) {
if len(ranges) == 0 { if len(ranges) == 0 {
return nil, errors.Errorf("at least one range must be provided") return nil, errors.Errorf("at least one range must be provided")
} }
@ -140,6 +142,7 @@ func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64,
chunkPool: pool, chunkPool: pool,
logger: l, logger: l,
metrics: newCompactorMetrics(r), metrics: newCompactorMetrics(r),
ctx: ctx,
}, nil }, nil
} }
@ -441,8 +444,11 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
dir := filepath.Join(dest, meta.ULID.String()) dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp" tmp := dir + ".tmp"
var writers []io.Closer
defer func(t time.Time) { defer func(t time.Time) {
for _, w := range writers {
w.Close()
}
if err != nil { if err != nil {
c.metrics.failed.Inc() c.metrics.failed.Inc()
// TODO(gouthamve): Handle error how? // TODO(gouthamve): Handle error how?
@ -470,7 +476,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil { if err != nil {
return errors.Wrap(err, "open chunk writer") return errors.Wrap(err, "open chunk writer")
} }
defer chunkw.Close() writers = append(writers, chunkw)
// Record written chunk sizes on level 1 compactions. // Record written chunk sizes on level 1 compactions.
if meta.Compaction.Level == 1 { if meta.Compaction.Level == 1 {
chunkw = &instrumentedChunkWriter{ chunkw = &instrumentedChunkWriter{
@ -485,12 +491,25 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil { if err != nil {
return errors.Wrap(err, "open index writer") return errors.Wrap(err, "open index writer")
} }
defer indexw.Close() writers = append(writers, indexw)
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction") return errors.Wrap(err, "write compaction")
} }
// Remove tmp folder and return early when the compaction was canceled.
select {
case <-c.ctx.Done():
for _, w := range writers {
w.Close()
}
if err := os.RemoveAll(tmp); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after canceled compaction", "err", err.Error())
}
return
default:
}
if err = writeMetaFile(tmp, meta); err != nil { if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta") return errors.Wrap(err, "write merged meta")
} }
@ -499,11 +518,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// though these are covered under defer. This is because in Windows, // though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to // you cannot delete these unless they are closed and the defer is to
// make sure they are closed if the function exits due to an error above. // make sure they are closed if the function exits due to an error above.
if err = chunkw.Close(); err != nil { for _, w := range writers {
return errors.Wrap(err, "close chunk writer") if err := w.Close(); err != nil {
} return err
if err = indexw.Close(); err != nil { }
return errors.Wrap(err, "close index writer")
} }
// Create an empty tombstones file. // Create an empty tombstones file.
@ -554,6 +572,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
defer func() { closeAll(closers...) }() defer func() { closeAll(closers...) }()
for i, b := range blocks { for i, b := range blocks {
select {
case <-c.ctx.Done():
return nil
default:
}
indexr, err := b.Index() indexr, err := b.Index()
if err != nil { if err != nil {
return errors.Wrapf(err, "open index reader for block %s", b) return errors.Wrapf(err, "open index reader for block %s", b)
@ -610,6 +634,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
for set.Next() { for set.Next() {
select {
case <-c.ctx.Done():
return nil
default:
}
lset, chks, dranges := set.At() // The chunks here are not fully deleted. lset, chks, dranges := set.At() // The chunks here are not fully deleted.
// Skip the series with all deleted chunks. // Skip the series with all deleted chunks.

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"context"
"io/ioutil" "io/ioutil"
"math" "math"
"os" "os"
@ -151,7 +152,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
}, },
} }
c, err := NewLeveledCompactor(nil, nil, []int64{50}, nil) c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
c.plan(metas) c.plan(metas)
@ -159,7 +160,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
func TestLeveledCompactor_plan(t *testing.T) { func TestLeveledCompactor_plan(t *testing.T) {
// This mimicks our default ExponentialBlockRanges with min block size equals to 20. // This mimicks our default ExponentialBlockRanges with min block size equals to 20.
compactor, err := NewLeveledCompactor(nil, nil, []int64{ compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{
20, 20,
60, 60,
180, 180,
@ -322,7 +323,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
} }
func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
compactor, err := NewLeveledCompactor(nil, nil, []int64{ compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{
20, 20,
60, 60,
240, 240,
@ -372,7 +373,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
} }
func TestCompactionFailWillCleanUpTempDir(t *testing.T) { func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{ compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{
20, 20,
60, 60,
240, 240,
@ -647,7 +648,7 @@ func TestCompaction_populateBlock(t *testing.T) {
blocks = append(blocks, &mockBReader{ir: ir, cr: cr}) blocks = append(blocks, &mockBReader{ir: ir, cr: cr})
} }
c, err := NewLeveledCompactor(nil, nil, []int64{0}, nil) c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
meta := &BlockMeta{ meta := &BlockMeta{

9
db.go
View file

@ -16,6 +16,7 @@ package tsdb
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -119,6 +120,9 @@ type DB struct {
// changing the autoCompact var. // changing the autoCompact var.
autoCompactMtx sync.Mutex autoCompactMtx sync.Mutex
autoCompact bool autoCompact bool
// Cancel a running compaction when a shutdown is initiated.
compactCnl func()
} }
type dbMetrics struct { type dbMetrics struct {
@ -258,10 +262,12 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
db.lockf = lockf db.lockf = lockf
} }
db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool) ctx, cnl := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "create leveled compactor") return nil, errors.Wrap(err, "create leveled compactor")
} }
db.compactCnl = cnl
wlog, err := wal.New(l, r, filepath.Join(dir, "wal")) wlog, err := wal.New(l, r, filepath.Join(dir, "wal"))
if err != nil { if err != nil {
@ -726,6 +732,7 @@ func (db *DB) Head() *Head {
// Close the partition. // Close the partition.
func (db *DB) Close() error { func (db *DB) Close() error {
close(db.stopc) close(db.stopc)
db.compactCnl()
<-db.donec <-db.donec
db.mtx.Lock() db.mtx.Lock()