mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 05:47:27 -08:00
tsdb: Delete blocks atomically; Remove tmp blocks on start; Added test. (#7772)
## Changes: * Rename dir when deleting * Ignoring blocks with broken meta.json on start (we do that on reload) * Compactor writes <ulid>.tmp-for-creation blocks instead of just .tmp * Delete tmp-for-creation and tmp-for-deletion blocks during DB open. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
parent
2b75c1b199
commit
4ae2ef94e0
|
@ -524,7 +524,7 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
|
|||
// It cleans up all files of the old blocks after completing successfully.
|
||||
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
|
||||
dir := filepath.Join(dest, meta.ULID.String())
|
||||
tmp := dir + ".tmp"
|
||||
tmp := dir + tmpForCreationBlockDirSuffix
|
||||
var closers []io.Closer
|
||||
defer func(t time.Time) {
|
||||
var merr tsdb_errors.MultiError
|
||||
|
|
|
@ -440,7 +440,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
|
|||
}()
|
||||
|
||||
testutil.NotOk(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{}))
|
||||
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + ".tmp")
|
||||
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix)
|
||||
testutil.Assert(t, os.IsNotExist(err), "directory is not cleaned up")
|
||||
}
|
||||
|
||||
|
|
60
tsdb/db.go
60
tsdb/db.go
|
@ -49,6 +49,14 @@ import (
|
|||
const (
|
||||
// Default duration of a block in milliseconds.
|
||||
DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond)
|
||||
|
||||
// Block dir suffixes to make deletion and creation operations atomic.
|
||||
// We decided to do suffixes instead of creating meta.json as last (or delete as first) one,
|
||||
// because in error case you still can recover meta.json from the block content within local TSDB dir.
|
||||
// TODO(bwplotka): TSDB can end up with various .tmp files (e.g meta.json.tmp, WAL or segment tmp file. Think
|
||||
// about removing those too on start to save space. Currently only blocks tmp dirs are removed.
|
||||
tmpForDeletionBlockDirSuffix = ".tmp-for-deletion"
|
||||
tmpForCreationBlockDirSuffix = ".tmp-for-creation"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -566,12 +574,16 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
|
||||
// Fixup bad format written by Prometheus 2.1.
|
||||
if err := repairBadIndexVersion(l, dir); err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "repair bad index version")
|
||||
}
|
||||
// Migrate old WAL if one exists.
|
||||
if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil {
|
||||
return nil, errors.Wrap(err, "migrate WAL")
|
||||
}
|
||||
// Remove garbage, tmp blocks.
|
||||
if err := removeBestEffortTmpDirs(l, dir); err != nil {
|
||||
return nil, errors.Wrap(err, "remove tmp dirs")
|
||||
}
|
||||
|
||||
db = &DB{
|
||||
dir: dir,
|
||||
|
@ -660,6 +672,23 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
return db, nil
|
||||
}
|
||||
|
||||
func removeBestEffortTmpDirs(l log.Logger, dir string) error {
|
||||
files, err := ioutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, fi := range files {
|
||||
if isTmpBlockDir(fi) {
|
||||
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
|
||||
level.Error(l).Log("msg", "failed to delete tmp block dir", "dir", filepath.Join(dir, fi.Name()), "err", err)
|
||||
continue
|
||||
}
|
||||
level.Info(l).Log("msg", "Found and deleted tmp block dir", "dir", filepath.Join(dir, fi.Name()))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartTime implements the Storage interface.
|
||||
func (db *DB) StartTime() (int64, error) {
|
||||
db.mtx.RLock()
|
||||
|
@ -990,7 +1019,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
|
|||
for _, bDir := range bDirs {
|
||||
meta, _, err := readMetaFile(bDir)
|
||||
if err != nil {
|
||||
level.Error(l).Log("msg", "failed to read meta.json for a block", "dir", bDir, "err", err)
|
||||
level.Error(l).Log("msg", "failed to read meta.json for a block during reload; skipping", "dir", bDir, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1016,7 +1045,7 @@ func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc {
|
|||
}
|
||||
}
|
||||
|
||||
// deletableBlocks returns all blocks past retention policy.
|
||||
// deletableBlocks returns all currently loaded blocks past retention policy or already compacted into a new block.
|
||||
func deletableBlocks(db *DB, blocks []*Block) map[ulid.ULID]struct{} {
|
||||
deletable := make(map[ulid.ULID]struct{})
|
||||
|
||||
|
@ -1105,10 +1134,17 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
|
|||
level.Warn(db.logger).Log("msg", "Closing block failed", "err", err, "block", ulid)
|
||||
}
|
||||
}
|
||||
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
|
||||
|
||||
// Replace atomically to avoid partial block when process is crashing during this moment.
|
||||
tmpToDelete := filepath.Join(db.dir, fmt.Sprintf("%s%s", ulid, tmpForDeletionBlockDirSuffix))
|
||||
if err := fileutil.Replace(filepath.Join(db.dir, ulid.String()), tmpToDelete); err != nil {
|
||||
return errors.Wrapf(err, "replace of obsolete block for deletion %s", ulid)
|
||||
}
|
||||
if err := os.RemoveAll(tmpToDelete); err != nil {
|
||||
return errors.Wrapf(err, "delete obsolete block %s", ulid)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1481,6 +1517,22 @@ func isBlockDir(fi os.FileInfo) bool {
|
|||
return err == nil
|
||||
}
|
||||
|
||||
// isTmpBlockDir returns dir that consists of block dir ULID and tmp extension.
|
||||
func isTmpBlockDir(fi os.FileInfo) bool {
|
||||
if !fi.IsDir() {
|
||||
return false
|
||||
}
|
||||
|
||||
fn := fi.Name()
|
||||
ext := filepath.Ext(fn)
|
||||
if ext == tmpForDeletionBlockDirSuffix || ext == tmpForCreationBlockDirSuffix {
|
||||
if _, err := ulid.ParseStrict(fn[:len(fn)-len(ext)]); err == nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func blockDirs(dir string) ([]string, error) {
|
||||
files, err := ioutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
|
|
|
@ -2708,6 +2708,88 @@ func deleteNonBlocks(dbDir string) error {
|
|||
return errors.Errorf("root folder:%v still hase non block directory:%v", dbDir, dir.Name())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestOpen_VariousBlockStates(t *testing.T) {
|
||||
tmpDir, err := ioutil.TempDir("", "test")
|
||||
testutil.Ok(t, err)
|
||||
t.Cleanup(func() {
|
||||
testutil.Ok(t, os.RemoveAll(tmpDir))
|
||||
})
|
||||
|
||||
var (
|
||||
expectedLoadedDirs = map[string]struct{}{}
|
||||
expectedRemovedDirs = map[string]struct{}{}
|
||||
expectedIgnoredDirs = map[string]struct{}{}
|
||||
)
|
||||
|
||||
{
|
||||
// Ok blocks; should be loaded.
|
||||
expectedLoadedDirs[createBlock(t, tmpDir, genSeries(10, 2, 0, 10))] = struct{}{}
|
||||
expectedLoadedDirs[createBlock(t, tmpDir, genSeries(10, 2, 10, 20))] = struct{}{}
|
||||
}
|
||||
{
|
||||
// Block to repair; should be repaired & loaded.
|
||||
dbDir := filepath.Join("testdata", "repair_index_version", "01BZJ9WJQPWHGNC2W4J9TA62KC")
|
||||
outDir := filepath.Join(tmpDir, "01BZJ9WJQPWHGNC2W4J9TA62KC")
|
||||
expectedLoadedDirs[outDir] = struct{}{}
|
||||
|
||||
// Touch chunks dir in block.
|
||||
testutil.Ok(t, os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777))
|
||||
defer func() {
|
||||
testutil.Ok(t, os.RemoveAll(filepath.Join(dbDir, "chunks")))
|
||||
}()
|
||||
testutil.Ok(t, os.Mkdir(outDir, os.ModePerm))
|
||||
testutil.Ok(t, fileutil.CopyDirs(dbDir, outDir))
|
||||
}
|
||||
{
|
||||
// Missing meta.json; should be ignored and only logged.
|
||||
// TODO(bwplotka): Probably add metric.
|
||||
dir := createBlock(t, tmpDir, genSeries(10, 2, 20, 30))
|
||||
expectedIgnoredDirs[dir] = struct{}{}
|
||||
testutil.Ok(t, os.Remove(filepath.Join(dir, metaFilename)))
|
||||
}
|
||||
{
|
||||
// Tmp blocks during creation & deletion; those should be removed on start.
|
||||
dir := createBlock(t, tmpDir, genSeries(10, 2, 30, 40))
|
||||
testutil.Ok(t, fileutil.Replace(dir, dir+tmpForCreationBlockDirSuffix))
|
||||
expectedRemovedDirs[dir+tmpForCreationBlockDirSuffix] = struct{}{}
|
||||
|
||||
// Tmp blocks during creation & deletion; those should be removed on start.
|
||||
dir = createBlock(t, tmpDir, genSeries(10, 2, 40, 50))
|
||||
testutil.Ok(t, fileutil.Replace(dir, dir+tmpForDeletionBlockDirSuffix))
|
||||
expectedRemovedDirs[dir+tmpForDeletionBlockDirSuffix] = struct{}{}
|
||||
}
|
||||
|
||||
opts := DefaultOptions()
|
||||
opts.RetentionDuration = 0
|
||||
db, err := Open(tmpDir, log.NewLogfmtLogger(os.Stderr), nil, opts)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
loadedBlocks := db.Blocks()
|
||||
|
||||
var loaded int
|
||||
for _, l := range loadedBlocks {
|
||||
if _, ok := expectedLoadedDirs[filepath.Join(tmpDir, l.meta.ULID.String())]; !ok {
|
||||
t.Fatal("unexpected block", l.meta.ULID, "was loaded")
|
||||
}
|
||||
loaded++
|
||||
}
|
||||
testutil.Equals(t, len(expectedLoadedDirs), loaded)
|
||||
testutil.Ok(t, db.Close())
|
||||
|
||||
files, err := ioutil.ReadDir(tmpDir)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
var ignored int
|
||||
for _, f := range files {
|
||||
if _, ok := expectedRemovedDirs[filepath.Join(tmpDir, f.Name())]; ok {
|
||||
t.Fatal("expected", filepath.Join(tmpDir, f.Name()), "to be removed, but still exists")
|
||||
}
|
||||
if _, ok := expectedIgnoredDirs[filepath.Join(tmpDir, f.Name())]; ok {
|
||||
ignored++
|
||||
}
|
||||
}
|
||||
testutil.Equals(t, len(expectedIgnoredDirs), ignored)
|
||||
}
|
||||
|
|
|
@ -37,10 +37,6 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
|
|||
return errors.Wrapf(err, "list block dirs in %q", dir)
|
||||
}
|
||||
|
||||
wrapErr := func(err error, d string) error {
|
||||
return errors.Wrapf(err, "block dir: %q", d)
|
||||
}
|
||||
|
||||
tmpFiles := make([]string, 0, len(dirs))
|
||||
defer func() {
|
||||
for _, tmp := range tmpFiles {
|
||||
|
@ -53,7 +49,8 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
|
|||
for _, d := range dirs {
|
||||
meta, err := readBogusMetaFile(d)
|
||||
if err != nil {
|
||||
return wrapErr(err, d)
|
||||
level.Error(logger).Log("msg", "failed to read meta.json for a block during repair process; skipping", "dir", d, "err", err)
|
||||
continue
|
||||
}
|
||||
if meta.Version == metaVersion1 {
|
||||
level.Info(logger).Log(
|
||||
|
@ -73,44 +70,44 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
|
|||
|
||||
repl, err := os.Create(filepath.Join(d, "index.repaired"))
|
||||
if err != nil {
|
||||
return wrapErr(err, d)
|
||||
return errors.Wrapf(err, "create index.repaired for block dir: %v", d)
|
||||
}
|
||||
tmpFiles = append(tmpFiles, repl.Name())
|
||||
|
||||
broken, err := os.Open(filepath.Join(d, indexFilename))
|
||||
if err != nil {
|
||||
return wrapErr(err, d)
|
||||
return errors.Wrapf(err, "open broken index for block dir: %v", d)
|
||||
}
|
||||
if _, err := io.Copy(repl, broken); err != nil {
|
||||
return wrapErr(err, d)
|
||||
return errors.Wrapf(err, "copy content of index to index.repaired for block dir: %v", d)
|
||||
}
|
||||
|
||||
var merr tsdb_errors.MultiError
|
||||
|
||||
// Set the 5th byte to 2 to indicate the correct file format version.
|
||||
if _, err := repl.WriteAt([]byte{2}, 4); err != nil {
|
||||
merr.Add(wrapErr(err, d))
|
||||
merr.Add(wrapErr(repl.Close(), d))
|
||||
return merr.Err()
|
||||
merr.Add(errors.Wrap(err, "rewrite of index.repaired"))
|
||||
merr.Add(errors.Wrap(repl.Close(), "close"))
|
||||
return errors.Wrapf(merr.Err(), "block dir: %v", d)
|
||||
}
|
||||
if err := repl.Sync(); err != nil {
|
||||
merr.Add(wrapErr(err, d))
|
||||
merr.Add(wrapErr(repl.Close(), d))
|
||||
return merr.Err()
|
||||
merr.Add(errors.Wrap(err, "sync of index.repaired"))
|
||||
merr.Add(errors.Wrap(repl.Close(), "close"))
|
||||
return errors.Wrapf(merr.Err(), "block dir: %v", d)
|
||||
}
|
||||
if err := repl.Close(); err != nil {
|
||||
return wrapErr(err, d)
|
||||
return errors.Wrapf(repl.Close(), "close repaired index for block dir: %v", d)
|
||||
}
|
||||
if err := broken.Close(); err != nil {
|
||||
return wrapErr(err, d)
|
||||
return errors.Wrapf(repl.Close(), "close broken index for block dir: %v", d)
|
||||
}
|
||||
if err := fileutil.Replace(repl.Name(), broken.Name()); err != nil {
|
||||
return wrapErr(err, d)
|
||||
return errors.Wrapf(repl.Close(), "replaced broken index with index.repaired for block dir: %v", d)
|
||||
}
|
||||
// Reset version of meta.json to 1.
|
||||
meta.Version = metaVersion1
|
||||
if _, err := writeMetaFile(logger, d, meta); err != nil {
|
||||
return wrapErr(err, d)
|
||||
return errors.Wrapf(repl.Close(), "write meta for block dir: %v", d)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
@ -26,6 +27,12 @@ import (
|
|||
)
|
||||
|
||||
func TestRepairBadIndexVersion(t *testing.T) {
|
||||
tmpDir, err := ioutil.TempDir("", "test")
|
||||
testutil.Ok(t, err)
|
||||
t.Cleanup(func() {
|
||||
testutil.Ok(t, os.RemoveAll(tmpDir))
|
||||
})
|
||||
|
||||
// The broken index used in this test was written by the following script
|
||||
// at a broken revision.
|
||||
//
|
||||
|
@ -59,22 +66,21 @@ func TestRepairBadIndexVersion(t *testing.T) {
|
|||
// panic(err)
|
||||
// }
|
||||
// }
|
||||
dbDir := filepath.Join("testdata", "repair_index_version", "01BZJ9WJQPWHGNC2W4J9TA62KC")
|
||||
tmpDir := filepath.Join("testdata", "repair_index_version", "copy")
|
||||
tmpDbDir := filepath.Join(tmpDir, "3MCNSQ8S31EHGJYWK5E1GPJWJZ")
|
||||
tmpDbDir := filepath.Join(tmpDir, "01BZJ9WJQPWHGNC2W4J9TA62KC")
|
||||
|
||||
// Create a copy DB to run test against.
|
||||
testutil.Ok(t, fileutil.CopyDirs(filepath.Join("testdata", "repair_index_version", "01BZJ9WJQPWHGNC2W4J9TA62KC"), tmpDbDir))
|
||||
|
||||
// Check the current db.
|
||||
// In its current state, lookups should fail with the fixed code.
|
||||
_, _, err := readMetaFile(dbDir)
|
||||
_, _, err = readMetaFile(tmpDbDir)
|
||||
testutil.NotOk(t, err)
|
||||
|
||||
// Touch chunks dir in block.
|
||||
testutil.Ok(t, os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777))
|
||||
defer func() {
|
||||
testutil.Ok(t, os.RemoveAll(filepath.Join(dbDir, "chunks")))
|
||||
}()
|
||||
// Touch chunks dir in block to imitate them.
|
||||
testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDbDir, "chunks"), 0777))
|
||||
|
||||
r, err := index.NewFileReader(filepath.Join(dbDir, indexFilename))
|
||||
// Read current index to check integrity.
|
||||
r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename))
|
||||
testutil.Ok(t, err)
|
||||
p, err := r.Postings("b", "1")
|
||||
testutil.Ok(t, err)
|
||||
|
@ -87,13 +93,6 @@ func TestRepairBadIndexVersion(t *testing.T) {
|
|||
testutil.Ok(t, p.Err())
|
||||
testutil.Ok(t, r.Close())
|
||||
|
||||
// Create a copy DB to run test against.
|
||||
if err = fileutil.CopyDirs(dbDir, tmpDbDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
testutil.Ok(t, os.RemoveAll(tmpDir))
|
||||
}()
|
||||
// On DB opening all blocks in the base dir should be repaired.
|
||||
db, err := Open(tmpDir, nil, nil, nil)
|
||||
testutil.Ok(t, err)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"version": 2,
|
||||
"ulid": "01BZJ9WJR6Z192734YNMD62F6M",
|
||||
"ulid": "01BZJ9WJQPWHGNC2W4J9TA62KC",
|
||||
"minTime": 1511366400000,
|
||||
"maxTime": 1511368200000,
|
||||
"stats": {
|
||||
|
@ -11,7 +11,7 @@
|
|||
"compaction": {
|
||||
"level": 1,
|
||||
"sources": [
|
||||
"01BZJ9WJR6Z192734YNMD62F6M"
|
||||
"01BZJ9WJQPWHGNC2W4J9TA62KC"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue