Merge pull request #465 from krasi-georgiev/shutdown-during-compaction

use context to cancel compactions
This commit is contained in:
Krasi Georgiev 2019-02-12 11:25:40 +02:00 committed by GitHub
commit 9f28ffa6f4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 145 additions and 34 deletions

View file

@ -1,5 +1,6 @@
## master / unreleased ## master / unreleased
- [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db.
- [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block.
- [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes` - [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes`
## 0.4.0 ## 0.4.0

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"context"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
@ -85,7 +86,7 @@ func createBlock(tb testing.TB, dir string, series []Series) string {
err = app.Commit() err = app.Commit()
testutil.Ok(tb, err) testutil.Ok(tb, err)
compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{1000000}, nil) compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil)
testutil.Ok(tb, err) testutil.Ok(tb, err)
testutil.Ok(tb, os.MkdirAll(dir, 0777)) testutil.Ok(tb, os.MkdirAll(dir, 0777))

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
@ -75,15 +76,17 @@ type LeveledCompactor struct {
logger log.Logger logger log.Logger
ranges []int64 ranges []int64
chunkPool chunkenc.Pool chunkPool chunkenc.Pool
ctx context.Context
} }
type compactorMetrics struct { type compactorMetrics struct {
ran prometheus.Counter ran prometheus.Counter
failed prometheus.Counter populatingBlocks prometheus.Gauge
duration prometheus.Histogram failed prometheus.Counter
chunkSize prometheus.Histogram duration prometheus.Histogram
chunkSamples prometheus.Histogram chunkSize prometheus.Histogram
chunkRange prometheus.Histogram chunkSamples prometheus.Histogram
chunkRange prometheus.Histogram
} }
func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
@ -93,6 +96,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Name: "prometheus_tsdb_compactions_total", Name: "prometheus_tsdb_compactions_total",
Help: "Total number of compactions that were executed for the partition.", Help: "Total number of compactions that were executed for the partition.",
}) })
m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_compaction_populating_block",
Help: "Set to 1 when a block is currently being written to the disk.",
})
m.failed = prometheus.NewCounter(prometheus.CounterOpts{ m.failed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_failed_total", Name: "prometheus_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.", Help: "Total number of compactions that failed for the partition.",
@ -121,6 +128,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
if r != nil { if r != nil {
r.MustRegister( r.MustRegister(
m.ran, m.ran,
m.populatingBlocks,
m.failed, m.failed,
m.duration, m.duration,
m.chunkRange, m.chunkRange,
@ -132,7 +140,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
} }
// NewLeveledCompactor returns a LeveledCompactor. // NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) {
if len(ranges) == 0 { if len(ranges) == 0 {
return nil, errors.Errorf("at least one range must be provided") return nil, errors.Errorf("at least one range must be provided")
} }
@ -144,6 +152,7 @@ func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64,
chunkPool: pool, chunkPool: pool,
logger: l, logger: l,
metrics: newCompactorMetrics(r), metrics: newCompactorMetrics(r),
ctx: ctx,
}, nil }, nil
} }
@ -402,10 +411,11 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
var merr MultiError var merr MultiError
merr.Add(err) merr.Add(err)
if err != context.Canceled {
for _, b := range bs { for _, b := range bs {
if err := b.setCompactionFailed(); err != nil { if err := b.setCompactionFailed(); err != nil {
merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
}
} }
} }
@ -475,14 +485,19 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
dir := filepath.Join(dest, meta.ULID.String()) dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp" tmp := dir + ".tmp"
var closers []io.Closer
defer func(t time.Time) { defer func(t time.Time) {
var merr MultiError
merr.Add(err)
merr.Add(closeAll(closers))
err = merr.Err()
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
if err := os.RemoveAll(tmp); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
}
if err != nil { if err != nil {
c.metrics.failed.Inc() c.metrics.failed.Inc()
// TODO(gouthamve): Handle error how?
if err := os.RemoveAll(tmp); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
}
} }
c.metrics.ran.Inc() c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(t).Seconds()) c.metrics.duration.Observe(time.Since(t).Seconds())
@ -504,7 +519,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil { if err != nil {
return errors.Wrap(err, "open chunk writer") return errors.Wrap(err, "open chunk writer")
} }
defer chunkw.Close() closers = append(closers, chunkw)
// Record written chunk sizes on level 1 compactions. // Record written chunk sizes on level 1 compactions.
if meta.Compaction.Level == 1 { if meta.Compaction.Level == 1 {
chunkw = &instrumentedChunkWriter{ chunkw = &instrumentedChunkWriter{
@ -519,27 +534,33 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil { if err != nil {
return errors.Wrap(err, "open index writer") return errors.Wrap(err, "open index writer")
} }
defer indexw.Close() closers = append(closers, indexw)
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction") return errors.Wrap(err, "write compaction")
} }
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
}
// We are explicitly closing them here to check for error even // We are explicitly closing them here to check for error even
// though these are covered under defer. This is because in Windows, // though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to // you cannot delete these unless they are closed and the defer is to
// make sure they are closed if the function exits due to an error above. // make sure they are closed if the function exits due to an error above.
if err = chunkw.Close(); err != nil { var merr MultiError
return errors.Wrap(err, "close chunk writer") for _, w := range closers {
merr.Add(w.Close())
} }
if err = indexw.Close(); err != nil { closers = closers[:0] // Avoid closing the writers twice in the defer.
return errors.Wrap(err, "close index writer") if merr.Err() != nil {
return merr.Err()
} }
// Populated block is empty, so cleanup and exit. // Populated block is empty, so exit early.
if meta.Stats.NumSamples == 0 { if meta.Stats.NumSamples == 0 {
if err := os.RemoveAll(tmp); err != nil {
return errors.Wrap(err, "remove tmp folder after empty block failed")
}
return nil return nil
} }
@ -597,9 +618,17 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
merr.Add(err) merr.Add(err)
merr.Add(closeAll(closers)) merr.Add(closeAll(closers))
err = merr.Err() err = merr.Err()
c.metrics.populatingBlocks.Set(0)
}() }()
c.metrics.populatingBlocks.Set(1)
for i, b := range blocks { for i, b := range blocks {
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
}
indexr, err := b.Index() indexr, err := b.Index()
if err != nil { if err != nil {
return errors.Wrapf(err, "open index reader for block %s", b) return errors.Wrapf(err, "open index reader for block %s", b)
@ -656,6 +685,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
for set.Next() { for set.Next() {
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
}
lset, chks, dranges := set.At() // The chunks here are not fully deleted. lset, chks, dranges := set.At() // The chunks here are not fully deleted.
// Skip the series with all deleted chunks. // Skip the series with all deleted chunks.

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"context"
"io/ioutil" "io/ioutil"
"math" "math"
"os" "os"
@ -27,6 +28,7 @@ import (
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
) )
@ -153,7 +155,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
}, },
} }
c, err := NewLeveledCompactor(nil, nil, []int64{50}, nil) c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
c.plan(metas) c.plan(metas)
@ -161,7 +163,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
func TestLeveledCompactor_plan(t *testing.T) { func TestLeveledCompactor_plan(t *testing.T) {
// This mimicks our default ExponentialBlockRanges with min block size equals to 20. // This mimicks our default ExponentialBlockRanges with min block size equals to 20.
compactor, err := NewLeveledCompactor(nil, nil, []int64{ compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{
20, 20,
60, 60,
180, 180,
@ -324,7 +326,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
} }
func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
compactor, err := NewLeveledCompactor(nil, nil, []int64{ compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{
20, 20,
60, 60,
240, 240,
@ -374,7 +376,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
} }
func TestCompactionFailWillCleanUpTempDir(t *testing.T) { func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{ compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{
20, 20,
60, 60,
240, 240,
@ -649,7 +651,7 @@ func TestCompaction_populateBlock(t *testing.T) {
blocks = append(blocks, &mockBReader{ir: ir, cr: cr}) blocks = append(blocks, &mockBReader{ir: ir, cr: cr})
} }
c, err := NewLeveledCompactor(nil, nil, []int64{0}, nil) c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
meta := &BlockMeta{ meta := &BlockMeta{
@ -746,6 +748,70 @@ func TestDisableAutoCompactions(t *testing.T) {
testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.")
} }
// TestCancelCompactions ensures that when the db is closed
// any running compaction is cancelled to unblock closing the db.
func TestCancelCompactions(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "testCancelCompaction")
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)
// Create some blocks to fall within the compaction range.
createBlock(t, tmpdir, genSeries(10, 10000, 0, 1000))
createBlock(t, tmpdir, genSeries(10, 10000, 1000, 2000))
createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one.
// Copy the db so we have an exact copy to compare compaction times.
tmpdirCopy := tmpdir + "Copy"
err = fileutil.CopyDirs(tmpdir, tmpdirCopy)
testutil.Ok(t, err)
defer os.RemoveAll(tmpdirCopy)
// Measure the compaction time without interupting it.
var timeCompactionUninterrupted time.Duration
{
db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}})
testutil.Ok(t, err)
testutil.Equals(t, 3, len(db.Blocks()), "initial block count mismatch")
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
db.compactc <- struct{}{} // Trigger a compaction.
var start time.Time
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 {
time.Sleep(3 * time.Millisecond)
}
start = time.Now()
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) != 1 {
time.Sleep(3 * time.Millisecond)
}
timeCompactionUninterrupted = time.Since(start)
testutil.Ok(t, db.Close())
}
// Measure the compaction time when closing the db in the middle of compaction.
{
db, err := Open(tmpdirCopy, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}})
testutil.Ok(t, err)
testutil.Equals(t, 3, len(db.Blocks()), "initial block count mismatch")
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
db.compactc <- struct{}{} // Trigger a compaction.
dbClosed := make(chan struct{})
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 {
time.Sleep(3 * time.Millisecond)
}
go func() {
testutil.Ok(t, db.Close())
close(dbClosed)
}()
start := time.Now()
<-dbClosed
actT := time.Since(start)
expT := time.Duration(timeCompactionUninterrupted / 2) // Closing the db in the middle of compaction should less than half the time.
testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT)
}
}
// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload immediately after a compaction // TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload immediately after a compaction
// deletes the resulting block to avoid creatings blocks with the same time range. // deletes the resulting block to avoid creatings blocks with the same time range.
func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {

10
db.go
View file

@ -16,6 +16,7 @@ package tsdb
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -126,6 +127,9 @@ type DB struct {
// changing the autoCompact var. // changing the autoCompact var.
autoCompactMtx sync.Mutex autoCompactMtx sync.Mutex
autoCompact bool autoCompact bool
// Cancel a running compaction when a shutdown is initiated.
compactCancel context.CancelFunc
} }
type dbMetrics struct { type dbMetrics struct {
@ -271,10 +275,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
db.lockf = lockf db.lockf = lockf
} }
db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool) ctx, cancel := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool)
if err != nil { if err != nil {
cancel()
return nil, errors.Wrap(err, "create leveled compactor") return nil, errors.Wrap(err, "create leveled compactor")
} }
db.compactCancel = cancel
segmentSize := wal.DefaultSegmentSize segmentSize := wal.DefaultSegmentSize
if opts.WALSegmentSize > 0 { if opts.WALSegmentSize > 0 {
@ -826,6 +833,7 @@ func (db *DB) Head() *Head {
// Close the partition. // Close the partition.
func (db *DB) Close() error { func (db *DB) Close() error {
close(db.stopc) close(db.stopc)
db.compactCancel()
<-db.donec <-db.donec
db.mtx.Lock() db.mtx.Lock()