From bd5ccee5c1e364bbe1886ef25cc58ab4a06cc812 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 5 Dec 2018 18:34:42 +0200 Subject: [PATCH 01/20] use context to cancel compactions Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 2 ++ block_test.go | 3 ++- compact.go | 47 ++++++++++++++++++++++++++++++++++++++--------- compact_test.go | 11 ++++++----- db.go | 9 ++++++++- 5 files changed, 56 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d220de4ce..6fec65fb2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ ## master / unreleased + - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. + ## 0.3.0 diff --git a/block_test.go b/block_test.go index 03ac006a9..661898b7f 100644 --- a/block_test.go +++ b/block_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "io/ioutil" "math/rand" "os" @@ -105,7 +106,7 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int testutil.Ok(tb, err) } - compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{1000000}, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) testutil.Ok(tb, err) testutil.Ok(tb, os.MkdirAll(dir, 0777)) diff --git a/compact.go b/compact.go index f8e6ff545..37fa3ae00 100644 --- a/compact.go +++ b/compact.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "io" "math/rand" @@ -71,6 +72,7 @@ type LeveledCompactor struct { logger log.Logger ranges []int64 chunkPool chunkenc.Pool + ctx context.Context } type compactorMetrics struct { @@ -128,7 +130,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } // NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } @@ -140,6 +142,7 @@ func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, chunkPool: pool, logger: l, metrics: newCompactorMetrics(r), + ctx: ctx, }, nil } @@ -441,8 +444,11 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + ".tmp" - + var writers []io.Closer defer func(t time.Time) { + for _, w := range writers { + w.Close() + } if err != nil { c.metrics.failed.Inc() // TODO(gouthamve): Handle error how? @@ -470,7 +476,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open chunk writer") } - defer chunkw.Close() + writers = append(writers, chunkw) // Record written chunk sizes on level 1 compactions. if meta.Compaction.Level == 1 { chunkw = &instrumentedChunkWriter{ @@ -485,12 +491,25 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open index writer") } - defer indexw.Close() + writers = append(writers, indexw) if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } + // Remove tmp folder and return early when the compaction was canceled. + select { + case <-c.ctx.Done(): + for _, w := range writers { + w.Close() + } + if err := os.RemoveAll(tmp); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after canceled compaction", "err", err.Error()) + } + return + default: + } + if err = writeMetaFile(tmp, meta); err != nil { return errors.Wrap(err, "write merged meta") } @@ -499,11 +518,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to // make sure they are closed if the function exits due to an error above. - if err = chunkw.Close(); err != nil { - return errors.Wrap(err, "close chunk writer") - } - if err = indexw.Close(); err != nil { - return errors.Wrap(err, "close index writer") + for _, w := range writers { + if err := w.Close(); err != nil { + return err + } } // Create an empty tombstones file. @@ -554,6 +572,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, defer func() { closeAll(closers...) }() for i, b := range blocks { + select { + case <-c.ctx.Done(): + return nil + default: + } + indexr, err := b.Index() if err != nil { return errors.Wrapf(err, "open index reader for block %s", b) @@ -610,6 +634,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for set.Next() { + select { + case <-c.ctx.Done(): + return nil + default: + } lset, chks, dranges := set.At() // The chunks here are not fully deleted. // Skip the series with all deleted chunks. diff --git a/compact_test.go b/compact_test.go index 2489a21e1..89c36767f 100644 --- a/compact_test.go +++ b/compact_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "io/ioutil" "math" "os" @@ -151,7 +152,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c, err := NewLeveledCompactor(nil, nil, []int64{50}, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil) testutil.Ok(t, err) c.plan(metas) @@ -159,7 +160,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { func TestLeveledCompactor_plan(t *testing.T) { // This mimicks our default ExponentialBlockRanges with min block size equals to 20. - compactor, err := NewLeveledCompactor(nil, nil, []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ 20, 60, 180, @@ -322,7 +323,7 @@ func TestLeveledCompactor_plan(t *testing.T) { } func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { - compactor, err := NewLeveledCompactor(nil, nil, []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ 20, 60, 240, @@ -372,7 +373,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { } func TestCompactionFailWillCleanUpTempDir(t *testing.T) { - compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{ 20, 60, 240, @@ -647,7 +648,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr}) } - c, err := NewLeveledCompactor(nil, nil, []int64{0}, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil) testutil.Ok(t, err) meta := &BlockMeta{ diff --git a/db.go b/db.go index 3a47f0bf4..ac9c832b9 100644 --- a/db.go +++ b/db.go @@ -16,6 +16,7 @@ package tsdb import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -119,6 +120,9 @@ type DB struct { // changing the autoCompact var. autoCompactMtx sync.Mutex autoCompact bool + + // Cancel a running compaction when a shutdown is initiated. + compactCnl func() } type dbMetrics struct { @@ -258,10 +262,12 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = lockf } - db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool) + ctx, cnl := context.WithCancel(context.Background()) + db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool) if err != nil { return nil, errors.Wrap(err, "create leveled compactor") } + db.compactCnl = cnl wlog, err := wal.New(l, r, filepath.Join(dir, "wal")) if err != nil { @@ -726,6 +732,7 @@ func (db *DB) Head() *Head { // Close the partition. func (db *DB) Close() error { close(db.stopc) + db.compactCnl() <-db.donec db.mtx.Lock() From fced260a24c35ecf8826581f72ee34cfc33432bb Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 7 Dec 2018 17:49:23 +0200 Subject: [PATCH 02/20] test Signed-off-by: Krasi Georgiev --- compact.go | 24 +++++++++++++++++------- compact_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/compact.go b/compact.go index 37fa3ae00..556856107 100644 --- a/compact.go +++ b/compact.go @@ -76,12 +76,13 @@ type LeveledCompactor struct { } type compactorMetrics struct { - ran prometheus.Counter - failed prometheus.Counter - duration prometheus.Histogram - chunkSize prometheus.Histogram - chunkSamples prometheus.Histogram - chunkRange prometheus.Histogram + ran prometheus.Counter + populatingBlocks prometheus.Counter + failed prometheus.Counter + duration prometheus.Histogram + chunkSize prometheus.Histogram + chunkSamples prometheus.Histogram + chunkRange prometheus.Histogram } func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { @@ -91,6 +92,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { Name: "prometheus_tsdb_compactions_total", Help: "Total number of compactions that were executed for the partition.", }) + m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_compaction_populating_block", + Help: "Set to 1 when a block is currently being written to the disk.", + }) m.failed = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_failed_total", Help: "Total number of compactions that failed for the partition.", @@ -119,6 +124,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { if r != nil { r.MustRegister( m.ran, + m.populatingBlocks, m.failed, m.duration, m.chunkRange, @@ -569,8 +575,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, allSymbols = make(map[string]struct{}, 1<<16) closers = []io.Closer{} ) - defer func() { closeAll(closers...) }() + defer func() { + closeAll(closers...) + c.metrics.populatingBlocks.Add(-1) + }() + c.metrics.populatingBlocks.Inc() for i, b := range blocks { select { case <-c.ctx.Done(): diff --git a/compact_test.go b/compact_test.go index 89c36767f..36bcb220c 100644 --- a/compact_test.go +++ b/compact_test.go @@ -24,6 +24,7 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" @@ -742,3 +743,38 @@ func TestDisableAutoCompactions(t *testing.T) { } testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") } + +// TestCancelCompactions ensures that when the db is closed +// any running compaction is cancelled to unblock closing the db. +func TestCancelCompactions(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + defer os.RemoveAll(tmpdir) + + // Create some blocks to fall within the compaction range. + createPopulatedBlock(t, tmpdir, 3000, 0, 1000) + createPopulatedBlock(t, tmpdir, 3000, 1000, 2000) + createPopulatedBlock(t, tmpdir, 1, 2000, 2001) + + db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) + testutil.Ok(t, err) + + db.compactc <- struct{}{} // Trigger a compaction. + dbClosed := make(chan struct{}) + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { + time.Sleep(5 * time.Millisecond) + go func() { + testutil.Ok(t, db.Close()) + close(dbClosed) + }() + break + } + } + + start := time.Now() + <-dbClosed + actT := time.Since(start) + expT := time.Duration(50000000) + testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) +} From db127a60e0e990d4ac26b5c93a956bc394ccd6b9 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 8 Dec 2018 01:06:48 +0200 Subject: [PATCH 03/20] changelog Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 2 +- compact_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fec65fb2..7ec9f9eba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## master / unreleased - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. - + - [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block. ## 0.3.0 diff --git a/compact_test.go b/compact_test.go index 36bcb220c..3b63db313 100644 --- a/compact_test.go +++ b/compact_test.go @@ -775,6 +775,6 @@ func TestCancelCompactions(t *testing.T) { start := time.Now() <-dbClosed actT := time.Since(start) - expT := time.Duration(50000000) + expT := time.Duration(100000000) testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) } From 921f82cfc1e997c58201b8c5635b3f462346ecbd Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 18 Jan 2019 18:58:17 +0200 Subject: [PATCH 04/20] WIP Signed-off-by: Krasi Georgiev --- block_test.go | 14 ++++++++++---- compact.go | 8 +++++++- compact_test.go | 10 ++++++---- db.go | 6 ++++-- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/block_test.go b/block_test.go index dec044918..0f164ae06 100644 --- a/block_test.go +++ b/block_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "fmt" "io/ioutil" "math/rand" "os" @@ -71,15 +72,20 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin testutil.Ok(tb, err) var ref uint64 + fmt.Println(len(lbls)) + for ts := mint; ts <= maxt; ts++ { app := head.Appender() - for _, lbl := range lbls { - err := app.AddFast(ref, ts, rand.Float64()) - if err == nil { - continue + for i, lbl := range lbls { + if i > 0 && lbl.String() == lbls[i-1].String() { + err := app.AddFast(ref, ts, rand.Float64()) + if err == nil { + continue + } } ref, err = app.Add(lbl, int64(ts), rand.Float64()) testutil.Ok(tb, err) + } err := app.Commit() testutil.Ok(tb, err) diff --git a/compact.go b/compact.go index dbc3c2606..bf19199aa 100644 --- a/compact.go +++ b/compact.go @@ -530,7 +530,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "write compaction") } - // Remove tmp folder and return early when the compaction was canceled. + // Compaction was canceled so remove tmp folders and return early. select { case <-c.ctx.Done(): for _, w := range writers { @@ -616,6 +616,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, }() c.metrics.populatingBlocks.Inc() + + fmt.Println(blocks) for i, b := range blocks { select { case <-c.ctx.Done(): @@ -623,6 +625,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, default: } + fmt.Println("next block") + indexr, err := b.Index() if err != nil { return errors.Wrapf(err, "open index reader for block %s", b) @@ -684,6 +688,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return nil default: } + + // fmt.Println("next set") lset, chks, dranges := set.At() // The chunks here are not fully deleted. // Skip the series with all deleted chunks. diff --git a/compact_test.go b/compact_test.go index 2080fb291..b84cdf8dc 100644 --- a/compact_test.go +++ b/compact_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "fmt" "io/ioutil" "math" "os" @@ -752,9 +753,9 @@ func TestCancelCompactions(t *testing.T) { defer os.RemoveAll(tmpdir) // Create some blocks to fall within the compaction range. - createPopulatedBlock(t, tmpdir, 3000, 0, 1000) - createPopulatedBlock(t, tmpdir, 3000, 1000, 2000) - createPopulatedBlock(t, tmpdir, 1, 2000, 2001) + createBlock(t, tmpdir, 1000, 0, 1000) + createBlock(t, tmpdir, 1000, 1000, 2000) + createBlock(t, tmpdir, 1, 2000, 2001) // The most recent block is ignored so can be e small one. db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) testutil.Ok(t, err) @@ -763,7 +764,8 @@ func TestCancelCompactions(t *testing.T) { dbClosed := make(chan struct{}) for { if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { - time.Sleep(5 * time.Millisecond) + fmt.Println("populating started.") + time.Sleep(2 * time.Millisecond) go func() { testutil.Ok(t, db.Close()) close(dbClosed) diff --git a/db.go b/db.go index c9715483d..a580ea240 100644 --- a/db.go +++ b/db.go @@ -278,6 +278,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db ctx, cnl := context.WithCancel(context.Background()) db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool) if err != nil { + cnl() return nil, errors.Wrap(err, "create leveled compactor") } db.compactCnl = cnl @@ -459,7 +460,6 @@ func (db *DB) compact() (err error) { return nil default: } - if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil { return errors.Wrapf(err, "compact %s", plan) } @@ -819,7 +819,9 @@ func (db *DB) Head() *Head { // Close the partition. func (db *DB) Close() error { close(db.stopc) - db.compactCnl() + // fmt.Println("closing") + // db.compactCnl() + // fmt.Println("closed") <-db.donec db.mtx.Lock() From 9874377ead5ebba6f54e903f87a5e8ce8dab56c3 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 18 Jan 2019 19:21:09 +0200 Subject: [PATCH 05/20] nits Signed-off-by: Krasi Georgiev --- block_test.go | 13 +++++-------- compact.go | 4 ---- compact_test.go | 8 +++----- db.go | 4 +--- 4 files changed, 9 insertions(+), 20 deletions(-) diff --git a/block_test.go b/block_test.go index 0f164ae06..addb5ea97 100644 --- a/block_test.go +++ b/block_test.go @@ -15,7 +15,6 @@ package tsdb import ( "context" - "fmt" "io/ioutil" "math/rand" "os" @@ -70,22 +69,20 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), nSeries) testutil.Ok(tb, err) - var ref uint64 - - fmt.Println(len(lbls)) + refs := make([]uint64, nSeries) for ts := mint; ts <= maxt; ts++ { app := head.Appender() for i, lbl := range lbls { - if i > 0 && lbl.String() == lbls[i-1].String() { - err := app.AddFast(ref, ts, rand.Float64()) + if refs[i] != 0 { + err := app.AddFast(refs[i], ts, rand.Float64()) if err == nil { continue } } - ref, err = app.Add(lbl, int64(ts), rand.Float64()) + ref, err := app.Add(lbl, int64(ts), rand.Float64()) testutil.Ok(tb, err) - + refs[i] = ref } err := app.Commit() testutil.Ok(tb, err) diff --git a/compact.go b/compact.go index bf19199aa..33cd20adf 100644 --- a/compact.go +++ b/compact.go @@ -617,7 +617,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, c.metrics.populatingBlocks.Inc() - fmt.Println(blocks) for i, b := range blocks { select { case <-c.ctx.Done(): @@ -625,8 +624,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, default: } - fmt.Println("next block") - indexr, err := b.Index() if err != nil { return errors.Wrapf(err, "open index reader for block %s", b) @@ -689,7 +686,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, default: } - // fmt.Println("next set") lset, chks, dranges := set.At() // The chunks here are not fully deleted. // Skip the series with all deleted chunks. diff --git a/compact_test.go b/compact_test.go index b84cdf8dc..62db4d755 100644 --- a/compact_test.go +++ b/compact_test.go @@ -15,7 +15,6 @@ package tsdb import ( "context" - "fmt" "io/ioutil" "math" "os" @@ -753,8 +752,8 @@ func TestCancelCompactions(t *testing.T) { defer os.RemoveAll(tmpdir) // Create some blocks to fall within the compaction range. - createBlock(t, tmpdir, 1000, 0, 1000) - createBlock(t, tmpdir, 1000, 1000, 2000) + createBlock(t, tmpdir, 3000, 0, 1000) + createBlock(t, tmpdir, 3000, 1000, 2000) createBlock(t, tmpdir, 1, 2000, 2001) // The most recent block is ignored so can be e small one. db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) @@ -764,8 +763,7 @@ func TestCancelCompactions(t *testing.T) { dbClosed := make(chan struct{}) for { if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { - fmt.Println("populating started.") - time.Sleep(2 * time.Millisecond) + time.Sleep(3 * time.Millisecond) go func() { testutil.Ok(t, db.Close()) close(dbClosed) diff --git a/db.go b/db.go index a580ea240..bfa52840c 100644 --- a/db.go +++ b/db.go @@ -819,9 +819,7 @@ func (db *DB) Head() *Head { // Close the partition. func (db *DB) Close() error { close(db.stopc) - // fmt.Println("closing") - // db.compactCnl() - // fmt.Println("closed") + db.compactCnl() <-db.donec db.mtx.Lock() From 9638c13ec8b79b6cb1f574b59cab421dc2af2348 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 24 Jan 2019 12:48:56 +0200 Subject: [PATCH 06/20] test compares normal vs canceled compaction times Signed-off-by: Krasi Georgiev --- compact_test.go | 87 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 62 insertions(+), 25 deletions(-) diff --git a/compact_test.go b/compact_test.go index 62db4d755..c89c86c25 100644 --- a/compact_test.go +++ b/compact_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "fmt" "io/ioutil" "math" "os" @@ -747,34 +748,70 @@ func TestDisableAutoCompactions(t *testing.T) { // TestCancelCompactions ensures that when the db is closed // any running compaction is cancelled to unblock closing the db. func TestCancelCompactions(t *testing.T) { - tmpdir, err := ioutil.TempDir("", "test") - testutil.Ok(t, err) - defer os.RemoveAll(tmpdir) + createTestDb := func() (*DB, func()) { + tmpdir, err := ioutil.TempDir("", "testCancelCompaction") + testutil.Ok(t, err) - // Create some blocks to fall within the compaction range. - createBlock(t, tmpdir, 3000, 0, 1000) - createBlock(t, tmpdir, 3000, 1000, 2000) - createBlock(t, tmpdir, 1, 2000, 2001) // The most recent block is ignored so can be e small one. + // Create some blocks to fall within the compaction range. + createBlock(t, tmpdir, 4000, 0, 1000) + createBlock(t, tmpdir, 4000, 1000, 2000) + createBlock(t, tmpdir, 1, 2000, 2001) // The most recent block is ignored so can be e small one. + db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) + testutil.Ok(t, err) + testutil.Equals(t, 3, len(db.Blocks()), "initial block count mismatch") + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") - db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) - testutil.Ok(t, err) - - db.compactc <- struct{}{} // Trigger a compaction. - dbClosed := make(chan struct{}) - for { - if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { - time.Sleep(3 * time.Millisecond) - go func() { - testutil.Ok(t, db.Close()) - close(dbClosed) - }() - break + return db, func() { + os.RemoveAll(tmpdir) } } + // First lets mesure the compaction time without interupting it. + var timeCompactionUninterrupted time.Duration + { + db, delete := createTestDb() + defer delete() + db.compactc <- struct{}{} // Trigger a compaction. + var start time.Time + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { + start = time.Now() + break + } + time.Sleep(3 * time.Millisecond) + } - start := time.Now() - <-dbClosed - actT := time.Since(start) - expT := time.Duration(100000000) - testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) == 1 { + timeCompactionUninterrupted = time.Since(start) + break + } + time.Sleep(3 * time.Millisecond) + } + } + // Closing the db in the middle of compaction should take half the time. + { + db, delete := createTestDb() + defer delete() + + db.compactc <- struct{}{} // Trigger a compaction. + dbClosed := make(chan struct{}) + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { + time.Sleep(3 * time.Millisecond) + go func() { + testutil.Ok(t, db.Close()) + close(dbClosed) + }() + break + } + } + + start := time.Now() + <-dbClosed + actT := time.Since(start) + fmt.Println(timeCompactionUninterrupted) + fmt.Println(actT) + expT := time.Duration(timeCompactionUninterrupted / 2) + testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) + } } From 8ffd70534607205039741bc9352a7960da5ab8dd Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 24 Jan 2019 13:33:12 +0200 Subject: [PATCH 07/20] fix the misleading log during compaction cancelation Signed-off-by: Krasi Georgiev --- compact.go | 14 +++++++++----- compact_test.go | 4 ++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/compact.go b/compact.go index 33cd20adf..ab0d9aab6 100644 --- a/compact.go +++ b/compact.go @@ -412,10 +412,11 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u var merr MultiError merr.Add(err) - - for _, b := range bs { - if err := b.setCompactionFailed(); err != nil { - merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) + if err != ErrCompactionCanceled { + for _, b := range bs { + if err := b.setCompactionFailed(); err != nil { + merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) + } } } @@ -472,6 +473,9 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { return w.ChunkWriter.WriteChunks(chunks...) } +// ErrCompactionCanceled is returned when the compaction was canceled during shutdown. +var ErrCompactionCanceled = errors.New("compaction cancelled") + // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { @@ -539,7 +543,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err := os.RemoveAll(tmp); err != nil { level.Error(c.logger).Log("msg", "removed tmp folder after canceled compaction", "err", err.Error()) } - return + return ErrCompactionCanceled default: } diff --git a/compact_test.go b/compact_test.go index c89c86c25..f8a16cef4 100644 --- a/compact_test.go +++ b/compact_test.go @@ -753,8 +753,8 @@ func TestCancelCompactions(t *testing.T) { testutil.Ok(t, err) // Create some blocks to fall within the compaction range. - createBlock(t, tmpdir, 4000, 0, 1000) - createBlock(t, tmpdir, 4000, 1000, 2000) + createBlock(t, tmpdir, 10000, 0, 1000) + createBlock(t, tmpdir, 10000, 1000, 2000) createBlock(t, tmpdir, 1, 2000, 2001) // The most recent block is ignored so can be e small one. db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) testutil.Ok(t, err) From 1b0d85bbf2ec190c139c7f71bb2ff2f219890610 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 24 Jan 2019 14:15:32 +0200 Subject: [PATCH 08/20] use a db copy instead of creating it again. Signed-off-by: Krasi Georgiev --- compact_test.go | 124 +++++++++++++++++++++++++----------------------- 1 file changed, 65 insertions(+), 59 deletions(-) diff --git a/compact_test.go b/compact_test.go index f8a16cef4..13b56bcd8 100644 --- a/compact_test.go +++ b/compact_test.go @@ -15,7 +15,6 @@ package tsdb import ( "context" - "fmt" "io/ioutil" "math" "os" @@ -28,6 +27,7 @@ import ( prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" ) @@ -748,70 +748,76 @@ func TestDisableAutoCompactions(t *testing.T) { // TestCancelCompactions ensures that when the db is closed // any running compaction is cancelled to unblock closing the db. func TestCancelCompactions(t *testing.T) { - createTestDb := func() (*DB, func()) { - tmpdir, err := ioutil.TempDir("", "testCancelCompaction") - testutil.Ok(t, err) + tmpdir, err := ioutil.TempDir("", "testCancelCompaction") + testutil.Ok(t, err) + defer os.RemoveAll(tmpdir) - // Create some blocks to fall within the compaction range. - createBlock(t, tmpdir, 10000, 0, 1000) - createBlock(t, tmpdir, 10000, 1000, 2000) - createBlock(t, tmpdir, 1, 2000, 2001) // The most recent block is ignored so can be e small one. - db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) - testutil.Ok(t, err) - testutil.Equals(t, 3, len(db.Blocks()), "initial block count mismatch") - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") + // Create some blocks to fall within the compaction range. + createBlock(t, tmpdir, 7000, 0, 1000) + createBlock(t, tmpdir, 7000, 1000, 2000) + createBlock(t, tmpdir, 1, 2000, 2001) // The most recent block is ignored so can be e small one. - return db, func() { - os.RemoveAll(tmpdir) - } - } - // First lets mesure the compaction time without interupting it. + // Copy the db so we have an exact copy to compare compaction times. + tmpdirCopy := tmpdir + "Copy" + err = fileutil.CopyDirs(tmpdir, tmpdirCopy) + testutil.Ok(t, err) + defer os.RemoveAll(tmpdirCopy) + + // Measure the compaction time without interupting it. var timeCompactionUninterrupted time.Duration { - db, delete := createTestDb() - defer delete() - db.compactc <- struct{}{} // Trigger a compaction. - var start time.Time - for { - if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { - start = time.Now() - break - } - time.Sleep(3 * time.Millisecond) - } - - for { - if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) == 1 { - timeCompactionUninterrupted = time.Since(start) - break - } - time.Sleep(3 * time.Millisecond) - } - } - // Closing the db in the middle of compaction should take half the time. - { - db, delete := createTestDb() - defer delete() - - db.compactc <- struct{}{} // Trigger a compaction. - dbClosed := make(chan struct{}) - for { - if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { + db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) + testutil.Ok(t, err) + db.DisableCompactions() + testutil.Equals(t, 3, len(db.Blocks()), "initial block count mismatch") + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") + go func() { + var start time.Time + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { + start = time.Now() + break + } time.Sleep(3 * time.Millisecond) - go func() { - testutil.Ok(t, db.Close()) - close(dbClosed) - }() - break } - } - start := time.Now() - <-dbClosed - actT := time.Since(start) - fmt.Println(timeCompactionUninterrupted) - fmt.Println(actT) - expT := time.Duration(timeCompactionUninterrupted / 2) - testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) == 1 { + timeCompactionUninterrupted = time.Since(start) + break + } + time.Sleep(3 * time.Millisecond) + } + testutil.Ok(t, db.Close()) + }() + db.compact() + } + // Measure the compaction time when closing the db in the middle of compaction. + { + db, err := Open(tmpdirCopy, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) + testutil.Ok(t, err) + db.DisableCompactions() + testutil.Equals(t, 3, len(db.Blocks()), "initial block count mismatch") + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") + go func() { + dbClosed := make(chan struct{}) + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { + time.Sleep(3 * time.Millisecond) + go func() { + testutil.Ok(t, db.Close()) + close(dbClosed) + }() + break + } + } + + start := time.Now() + <-dbClosed + actT := time.Since(start) + expT := time.Duration(timeCompactionUninterrupted / 2) // Closing the db in the middle of compaction should less than half the time. + testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) + }() + db.compact() } } From fa1c00f9e2934e8b52ab4392e14d79eb0ace660c Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 24 Jan 2019 14:34:16 +0200 Subject: [PATCH 09/20] no backgorund Signed-off-by: Krasi Georgiev --- compact_test.go | 68 ++++++++++++++++++++++--------------------------- 1 file changed, 31 insertions(+), 37 deletions(-) diff --git a/compact_test.go b/compact_test.go index 13b56bcd8..fa05e442e 100644 --- a/compact_test.go +++ b/compact_test.go @@ -768,56 +768,50 @@ func TestCancelCompactions(t *testing.T) { { db, err := Open(tmpdir, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) testutil.Ok(t, err) - db.DisableCompactions() testutil.Equals(t, 3, len(db.Blocks()), "initial block count mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") - go func() { - var start time.Time - for { - if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { - start = time.Now() - break - } - time.Sleep(3 * time.Millisecond) + db.compactc <- struct{}{} // Trigger a compaction. + var start time.Time + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { + start = time.Now() + break } + time.Sleep(3 * time.Millisecond) + } - for { - if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) == 1 { - timeCompactionUninterrupted = time.Since(start) - break - } - time.Sleep(3 * time.Millisecond) + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) == 1 { + timeCompactionUninterrupted = time.Since(start) + break } - testutil.Ok(t, db.Close()) - }() - db.compact() + time.Sleep(3 * time.Millisecond) + } + testutil.Ok(t, db.Close()) } // Measure the compaction time when closing the db in the middle of compaction. { db, err := Open(tmpdirCopy, log.NewNopLogger(), nil, &Options{BlockRanges: []int64{1, 2000}}) testutil.Ok(t, err) - db.DisableCompactions() testutil.Equals(t, 3, len(db.Blocks()), "initial block count mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") - go func() { - dbClosed := make(chan struct{}) - for { - if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { - time.Sleep(3 * time.Millisecond) - go func() { - testutil.Ok(t, db.Close()) - close(dbClosed) - }() - break - } + db.compactc <- struct{}{} // Trigger a compaction. + dbClosed := make(chan struct{}) + for { + if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { + time.Sleep(3 * time.Millisecond) + go func() { + testutil.Ok(t, db.Close()) + close(dbClosed) + }() + break } + } - start := time.Now() - <-dbClosed - actT := time.Since(start) - expT := time.Duration(timeCompactionUninterrupted / 2) // Closing the db in the middle of compaction should less than half the time. - testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) - }() - db.compact() + start := time.Now() + <-dbClosed + actT := time.Since(start) + expT := time.Duration(timeCompactionUninterrupted / 2) // Closing the db in the middle of compaction should less than half the time. + testutil.Assert(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) } } From 752ab86e4e51b860974dc26c6b08f8000dbd31a2 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 4 Feb 2019 11:14:39 +0200 Subject: [PATCH 10/20] change the test block series for more stable tests Signed-off-by: Krasi Georgiev --- compact_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/compact_test.go b/compact_test.go index fa05e442e..9141be0e1 100644 --- a/compact_test.go +++ b/compact_test.go @@ -753,9 +753,9 @@ func TestCancelCompactions(t *testing.T) { defer os.RemoveAll(tmpdir) // Create some blocks to fall within the compaction range. - createBlock(t, tmpdir, 7000, 0, 1000) - createBlock(t, tmpdir, 7000, 1000, 2000) - createBlock(t, tmpdir, 1, 2000, 2001) // The most recent block is ignored so can be e small one. + createBlock(t, tmpdir, genSeries(10, 10000, 0, 1000)) + createBlock(t, tmpdir, genSeries(10, 10000, 1000, 2000)) + createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one. // Copy the db so we have an exact copy to compare compaction times. tmpdirCopy := tmpdir + "Copy" From ce4a2083fb984c69332861c41895b888c94e1819 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 4 Feb 2019 11:15:08 +0200 Subject: [PATCH 11/20] nit Signed-off-by: Krasi Georgiev --- block_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/block_test.go b/block_test.go index 8ba6f3ec5..8a4081a33 100644 --- a/block_test.go +++ b/block_test.go @@ -80,7 +80,6 @@ func createBlock(tb testing.TB, dir string, series []Series) string { } ref, err = app.Add(s.Labels(), t, v) testutil.Ok(tb, err) - refs[i] = ref } testutil.Ok(tb, it.Err()) } From 45acaadd81a2d8f754bed862ed20cf486882f006 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 6 Feb 2019 14:07:35 +0200 Subject: [PATCH 12/20] review changes Signed-off-by: Krasi Georgiev --- compact.go | 32 +++++++++++++------------------- db.go | 10 +++++----- 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/compact.go b/compact.go index bb5d53e2e..80d33dff8 100644 --- a/compact.go +++ b/compact.go @@ -412,7 +412,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u var merr MultiError merr.Add(err) - if err != ErrCompactionCanceled { + if err != context.Canceled { for _, b := range bs { if err := b.setCompactionFailed(); err != nil { merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) @@ -481,20 +481,19 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { return w.ChunkWriter.WriteChunks(chunks...) } -// ErrCompactionCanceled is returned when the compaction was canceled during shutdown. -var ErrCompactionCanceled = errors.New("compaction cancelled") - // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + ".tmp" var writers []io.Closer + var merr MultiError defer func(t time.Time) { + merr.Add(err) for _, w := range writers { - w.Close() + merr.Add(w.Close()) } - if err != nil { + if merr.Err() != nil { c.metrics.failed.Inc() // TODO(gouthamve): Handle error how? if err := os.RemoveAll(tmp); err != nil { @@ -542,16 +541,9 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "write compaction") } - // Compaction was canceled so remove tmp folders and return early. select { case <-c.ctx.Done(): - for _, w := range writers { - w.Close() - } - if err := os.RemoveAll(tmp); err != nil { - level.Error(c.logger).Log("msg", "removed tmp folder after canceled compaction", "err", err.Error()) - } - return ErrCompactionCanceled + return c.ctx.Err() default: } @@ -560,9 +552,11 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // you cannot delete these unless they are closed and the defer is to // make sure they are closed if the function exits due to an error above. for _, w := range writers { - if err := w.Close(); err != nil { - return err - } + merr.Add(w.Close()) + } + writers = writers[:0] // Avoid closing the writers twice in the defer. + if merr.Err() != nil { + return merr.Err() } // Populated block is empty, so cleanup and exit. @@ -632,7 +626,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, for i, b := range blocks { select { case <-c.ctx.Done(): - return nil + return c.ctx.Err() default: } @@ -694,7 +688,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, for set.Next() { select { case <-c.ctx.Done(): - return nil + return c.ctx.Err() default: } diff --git a/db.go b/db.go index 9436809b7..8bec75447 100644 --- a/db.go +++ b/db.go @@ -129,7 +129,7 @@ type DB struct { autoCompact bool // Cancel a running compaction when a shutdown is initiated. - compactCnl func() + compactCancel context.CancelFunc } type dbMetrics struct { @@ -275,13 +275,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = lockf } - ctx, cnl := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool) if err != nil { - cnl() + cancel() return nil, errors.Wrap(err, "create leveled compactor") } - db.compactCnl = cnl + db.compactCancel = cancel segmentSize := wal.DefaultSegmentSize if opts.WALSegmentSize > 0 { @@ -819,7 +819,7 @@ func (db *DB) Head() *Head { // Close the partition. func (db *DB) Close() error { close(db.stopc) - db.compactCnl() + db.compactCancel() <-db.donec db.mtx.Lock() From 08e7bc8ee855e341af3473ab08c42ac388ae41f2 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 6 Feb 2019 16:09:42 +0200 Subject: [PATCH 13/20] always remove tmp Signed-off-by: Krasi Georgiev --- compact.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/compact.go b/compact.go index 80d33dff8..e2815cfa7 100644 --- a/compact.go +++ b/compact.go @@ -493,12 +493,13 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe for _, w := range writers { merr.Add(w.Close()) } + + // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. + if err := os.RemoveAll(tmp); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } if merr.Err() != nil { c.metrics.failed.Inc() - // TODO(gouthamve): Handle error how? - if err := os.RemoveAll(tmp); err != nil { - level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) - } } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) @@ -559,11 +560,8 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return merr.Err() } - // Populated block is empty, so cleanup and exit. + // Populated block is empty, so exit early. if meta.Stats.NumSamples == 0 { - if err := os.RemoveAll(tmp); err != nil { - return errors.Wrap(err, "remove tmp folder after empty block failed") - } return nil } From 776769377ea9abe78ef1e22a175200bfa98aea4d Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 6 Feb 2019 16:31:02 +0200 Subject: [PATCH 14/20] fix merr logic. Signed-off-by: Krasi Georgiev --- compact.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/compact.go b/compact.go index e2815cfa7..6702935c0 100644 --- a/compact.go +++ b/compact.go @@ -487,9 +487,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + ".tmp" var writers []io.Closer - var merr MultiError defer func(t time.Time) { + var merr MultiError merr.Add(err) + err = merr.Err() for _, w := range writers { merr.Add(w.Close()) } @@ -498,7 +499,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err := os.RemoveAll(tmp); err != nil { level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) } - if merr.Err() != nil { + if err != nil { c.metrics.failed.Inc() } c.metrics.ran.Inc() @@ -552,6 +553,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to // make sure they are closed if the function exits due to an error above. + var merr MultiError for _, w := range writers { merr.Add(w.Close()) } From 2ae06202053eac6f54a8f6bdeb9ab356113c8120 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 7 Feb 2019 10:09:42 +0200 Subject: [PATCH 15/20] rename some vars and use Gauge instead of Counter for metrics Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 1 - compact.go | 18 +++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f42a92474..5f4584720 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,6 @@ ## master / unreleased - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. - [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block. - - [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes` ## 0.4.0 diff --git a/compact.go b/compact.go index 6702935c0..8a20dd1c6 100644 --- a/compact.go +++ b/compact.go @@ -82,7 +82,7 @@ type LeveledCompactor struct { type compactorMetrics struct { ran prometheus.Counter - populatingBlocks prometheus.Counter + populatingBlocks prometheus.Gauge failed prometheus.Counter duration prometheus.Histogram chunkSize prometheus.Histogram @@ -486,12 +486,12 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + ".tmp" - var writers []io.Closer + var closers []io.Closer defer func(t time.Time) { var merr MultiError merr.Add(err) err = merr.Err() - for _, w := range writers { + for _, w := range closers { merr.Add(w.Close()) } @@ -522,7 +522,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open chunk writer") } - writers = append(writers, chunkw) + closers = append(closers, chunkw) // Record written chunk sizes on level 1 compactions. if meta.Compaction.Level == 1 { chunkw = &instrumentedChunkWriter{ @@ -537,7 +537,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open index writer") } - writers = append(writers, indexw) + closers = append(closers, indexw) if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") @@ -554,10 +554,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // you cannot delete these unless they are closed and the defer is to // make sure they are closed if the function exits due to an error above. var merr MultiError - for _, w := range writers { + for _, w := range closers { merr.Add(w.Close()) } - writers = writers[:0] // Avoid closing the writers twice in the defer. + closers = closers[:0] // Avoid closing the writers twice in the defer. if merr.Err() != nil { return merr.Err() } @@ -618,10 +618,10 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, ) defer func() { closeAll(closers...) - c.metrics.populatingBlocks.Add(-1) + c.metrics.populatingBlocks.Set(0) }() - c.metrics.populatingBlocks.Inc() + c.metrics.populatingBlocks.Set(1) for i, b := range blocks { select { From 457534d5c415a4854eef3626313d5dd97911f676 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 8 Feb 2019 11:36:30 +0200 Subject: [PATCH 16/20] simplify nesting. Signed-off-by: Krasi Georgiev --- compact_test.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/compact_test.go b/compact_test.go index 9141be0e1..fa977879d 100644 --- a/compact_test.go +++ b/compact_test.go @@ -772,21 +772,16 @@ func TestCancelCompactions(t *testing.T) { testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") db.compactc <- struct{}{} // Trigger a compaction. var start time.Time - for { - if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { - start = time.Now() - break - } + for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 { time.Sleep(3 * time.Millisecond) } + start = time.Now() - for { - if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) == 1 { - timeCompactionUninterrupted = time.Since(start) - break - } + for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) != 1 { time.Sleep(3 * time.Millisecond) } + timeCompactionUninterrupted = time.Since(start) + testutil.Ok(t, db.Close()) } // Measure the compaction time when closing the db in the middle of compaction. From da9da9fbeeb4a16801f366d5ae25e83cd37fd828 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 8 Feb 2019 12:39:25 +0200 Subject: [PATCH 17/20] fix the sleep logic Signed-off-by: Krasi Georgiev --- compact_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compact_test.go b/compact_test.go index fa977879d..fa80ad630 100644 --- a/compact_test.go +++ b/compact_test.go @@ -794,13 +794,13 @@ func TestCancelCompactions(t *testing.T) { dbClosed := make(chan struct{}) for { if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { - time.Sleep(3 * time.Millisecond) go func() { testutil.Ok(t, db.Close()) close(dbClosed) }() break } + time.Sleep(3 * time.Millisecond) } start := time.Now() From 0f8f5027efa3c84841359d8c392b54be8ff1616e Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 8 Feb 2019 18:09:23 +0200 Subject: [PATCH 18/20] remove nested for if Signed-off-by: Krasi Georgiev --- compact_test.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/compact_test.go b/compact_test.go index fa80ad630..4fdd686b4 100644 --- a/compact_test.go +++ b/compact_test.go @@ -792,16 +792,14 @@ func TestCancelCompactions(t *testing.T) { testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") db.compactc <- struct{}{} // Trigger a compaction. dbClosed := make(chan struct{}) - for { - if prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) > 0 { - go func() { - testutil.Ok(t, db.Close()) - close(dbClosed) - }() - break - } + + for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 { time.Sleep(3 * time.Millisecond) } + go func() { + testutil.Ok(t, db.Close()) + close(dbClosed) + }() start := time.Now() <-dbClosed From c3a5c1d891f2ed628605e2fe74d640c76f358b37 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 11 Feb 2019 12:22:11 +0200 Subject: [PATCH 19/20] refactor error handling Signed-off-by: Krasi Georgiev --- compact.go | 11 ++++++----- db.go | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/compact.go b/compact.go index 8a20dd1c6..81a8355ac 100644 --- a/compact.go +++ b/compact.go @@ -490,10 +490,8 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe defer func(t time.Time) { var merr MultiError merr.Add(err) + merr.Add(closeAll(closers)) err = merr.Err() - for _, w := range closers { - merr.Add(w.Close()) - } // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. if err := os.RemoveAll(tmp); err != nil { @@ -606,7 +604,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { +func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { if len(blocks) == 0 { return errors.New("cannot populate block from no readers") } @@ -617,7 +615,10 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, closers = []io.Closer{} ) defer func() { - closeAll(closers...) + var merr MultiError + merr.Add(err) + merr.Add(closeAll(closers)) + err = merr.Err() c.metrics.populatingBlocks.Set(0) }() diff --git a/db.go b/db.go index 8bec75447..b4ea426d3 100644 --- a/db.go +++ b/db.go @@ -1091,7 +1091,7 @@ func (es MultiError) Err() error { return es } -func closeAll(cs ...io.Closer) error { +func closeAll(cs []io.Closer) error { var merr MultiError for _, c := range cs { From bf79c767f000d24461e8ebbebf008b8c68b3ca5e Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 12 Feb 2019 11:08:09 +0200 Subject: [PATCH 20/20] new line Signed-off-by: Krasi Georgiev --- compact_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compact_test.go b/compact_test.go index d947eb35d..a755d69f2 100644 --- a/compact_test.go +++ b/compact_test.go @@ -884,4 +884,4 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { testutil.Equals(t, expBlocks, len(actBlocks)-1, "block count should be the same as before the compaction") // -1 to exclude the corrupted block. }) } -} \ No newline at end of file +}