mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-22 19:26:56 -08:00
Merge pull request #462 from grafana/fix-block-compaction-failed-when-shutting-down-cherry-picked
Fix block compaction failed when shutting down
This commit is contained in:
commit
8a7ef4cfac
|
@ -539,7 +539,7 @@ func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, sh
|
||||||
}
|
}
|
||||||
|
|
||||||
errs := tsdb_errors.NewMulti(err)
|
errs := tsdb_errors.NewMulti(err)
|
||||||
if err != context.Canceled {
|
if !errors.Is(err, context.Canceled) {
|
||||||
for _, b := range bs {
|
for _, b := range bs {
|
||||||
if err := b.setCompactionFailed(); err != nil {
|
if err := b.setCompactionFailed(); err != nil {
|
||||||
errs.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
|
errs.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
|
||||||
|
|
|
@ -1388,12 +1388,11 @@ func TestCancelCompactions(t *testing.T) {
|
||||||
require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch")
|
require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch")
|
||||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
|
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
|
||||||
db.compactc <- struct{}{} // Trigger a compaction.
|
db.compactc <- struct{}{} // Trigger a compaction.
|
||||||
var start time.Time
|
|
||||||
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 {
|
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 {
|
||||||
time.Sleep(3 * time.Millisecond)
|
time.Sleep(3 * time.Millisecond)
|
||||||
}
|
}
|
||||||
start = time.Now()
|
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) != 1 {
|
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) != 1 {
|
||||||
time.Sleep(3 * time.Millisecond)
|
time.Sleep(3 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
@ -1408,21 +1407,29 @@ func TestCancelCompactions(t *testing.T) {
|
||||||
require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch")
|
require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch")
|
||||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
|
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
|
||||||
db.compactc <- struct{}{} // Trigger a compaction.
|
db.compactc <- struct{}{} // Trigger a compaction.
|
||||||
dbClosed := make(chan struct{})
|
|
||||||
|
|
||||||
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 {
|
for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 {
|
||||||
time.Sleep(3 * time.Millisecond)
|
time.Sleep(3 * time.Millisecond)
|
||||||
}
|
}
|
||||||
go func() {
|
|
||||||
require.NoError(t, db.Close())
|
|
||||||
close(dbClosed)
|
|
||||||
}()
|
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
<-dbClosed
|
require.NoError(t, db.Close())
|
||||||
actT := time.Since(start)
|
actT := time.Since(start)
|
||||||
expT := time.Duration(timeCompactionUninterrupted / 2) // Closing the db in the middle of compaction should less than half the time.
|
|
||||||
|
expT := timeCompactionUninterrupted / 2 // Closing the db in the middle of compaction should less than half the time.
|
||||||
require.True(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT)
|
require.True(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT)
|
||||||
|
|
||||||
|
// Make sure that no blocks were marked as compaction failed.
|
||||||
|
// This checks that the `context.Canceled` error is properly checked at all levels:
|
||||||
|
// - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks.
|
||||||
|
// - callers should check with errors.Is() instead of ==.
|
||||||
|
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, log.NewNopLogger())
|
||||||
|
require.NoError(t, err)
|
||||||
|
blocks, err := readOnlyDB.Blocks()
|
||||||
|
require.NoError(t, err)
|
||||||
|
for i, b := range blocks {
|
||||||
|
require.Falsef(t, b.Meta().Compaction.Failed, "block %d (%s) should not be marked as compaction failed", i, b.Meta().ULID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ package errors
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
@ -79,6 +80,19 @@ func (es nonNilMultiError) Error() string {
|
||||||
return buf.String()
|
return buf.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Is attempts to match the provided error against errors in the error list.
|
||||||
|
//
|
||||||
|
// This function allows errors.Is to traverse the values stored in the MultiError.
|
||||||
|
// It returns true if any of the errors in the list match the target.
|
||||||
|
func (es nonNilMultiError) Is(target error) bool {
|
||||||
|
for _, err := range es.errs {
|
||||||
|
if errors.Is(err, target) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// CloseAll closes all given closers while recording error in MultiError.
|
// CloseAll closes all given closers while recording error in MultiError.
|
||||||
func CloseAll(cs []io.Closer) error {
|
func CloseAll(cs []io.Closer) error {
|
||||||
errs := NewMulti()
|
errs := NewMulti()
|
||||||
|
|
Loading…
Reference in a new issue