diff --git a/tsdb/compact.go b/tsdb/compact.go index 2e75eeeb9..74f54fdb9 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -524,7 +524,7 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { // It cleans up all files of the old blocks after completing successfully. func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { dir := filepath.Join(dest, meta.ULID.String()) - tmp := dir + ".tmp" + tmp := dir + tmpForCreationBlockDirSuffix var closers []io.Closer defer func(t time.Time) { var merr tsdb_errors.MultiError diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 164149c06..a1a50f5a6 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -440,7 +440,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { }() testutil.NotOk(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{})) - _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + ".tmp") + _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix) testutil.Assert(t, os.IsNotExist(err), "directory is not cleaned up") } diff --git a/tsdb/db.go b/tsdb/db.go index 86704501d..300348435 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -49,6 +49,14 @@ import ( const ( // Default duration of a block in milliseconds. DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond) + + // Block dir suffixes to make deletion and creation operations atomic. + // We decided to do suffixes instead of creating meta.json as last (or delete as first) one, + // because in error case you still can recover meta.json from the block content within local TSDB dir. + // TODO(bwplotka): TSDB can end up with various .tmp files (e.g meta.json.tmp, WAL or segment tmp file. Think + // about removing those too on start to save space. Currently only blocks tmp dirs are removed. + tmpForDeletionBlockDirSuffix = ".tmp-for-deletion" + tmpForCreationBlockDirSuffix = ".tmp-for-creation" ) var ( @@ -566,12 +574,16 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs // Fixup bad format written by Prometheus 2.1. if err := repairBadIndexVersion(l, dir); err != nil { - return nil, err + return nil, errors.Wrap(err, "repair bad index version") } // Migrate old WAL if one exists. if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil { return nil, errors.Wrap(err, "migrate WAL") } + // Remove garbage, tmp blocks. + if err := removeBestEffortTmpDirs(l, dir); err != nil { + return nil, errors.Wrap(err, "remove tmp dirs") + } db = &DB{ dir: dir, @@ -660,6 +672,23 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs return db, nil } +func removeBestEffortTmpDirs(l log.Logger, dir string) error { + files, err := ioutil.ReadDir(dir) + if err != nil { + return err + } + for _, fi := range files { + if isTmpBlockDir(fi) { + if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { + level.Error(l).Log("msg", "failed to delete tmp block dir", "dir", filepath.Join(dir, fi.Name()), "err", err) + continue + } + level.Info(l).Log("msg", "Found and deleted tmp block dir", "dir", filepath.Join(dir, fi.Name())) + } + } + return nil +} + // StartTime implements the Storage interface. func (db *DB) StartTime() (int64, error) { db.mtx.RLock() @@ -990,7 +1019,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po for _, bDir := range bDirs { meta, _, err := readMetaFile(bDir) if err != nil { - level.Error(l).Log("msg", "failed to read meta.json for a block", "dir", bDir, "err", err) + level.Error(l).Log("msg", "failed to read meta.json for a block during reload; skipping", "dir", bDir, "err", err) continue } @@ -1016,7 +1045,7 @@ func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc { } } -// deletableBlocks returns all blocks past retention policy. +// deletableBlocks returns all currently loaded blocks past retention policy or already compacted into a new block. func deletableBlocks(db *DB, blocks []*Block) map[ulid.ULID]struct{} { deletable := make(map[ulid.ULID]struct{}) @@ -1105,10 +1134,17 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { level.Warn(db.logger).Log("msg", "Closing block failed", "err", err, "block", ulid) } } - if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { + + // Replace atomically to avoid partial block when process is crashing during this moment. + tmpToDelete := filepath.Join(db.dir, fmt.Sprintf("%s%s", ulid, tmpForDeletionBlockDirSuffix)) + if err := fileutil.Replace(filepath.Join(db.dir, ulid.String()), tmpToDelete); err != nil { + return errors.Wrapf(err, "replace of obsolete block for deletion %s", ulid) + } + if err := os.RemoveAll(tmpToDelete); err != nil { return errors.Wrapf(err, "delete obsolete block %s", ulid) } } + return nil } @@ -1481,6 +1517,22 @@ func isBlockDir(fi os.FileInfo) bool { return err == nil } +// isTmpBlockDir returns dir that consists of block dir ULID and tmp extension. +func isTmpBlockDir(fi os.FileInfo) bool { + if !fi.IsDir() { + return false + } + + fn := fi.Name() + ext := filepath.Ext(fn) + if ext == tmpForDeletionBlockDirSuffix || ext == tmpForCreationBlockDirSuffix { + if _, err := ulid.ParseStrict(fn[:len(fn)-len(ext)]); err == nil { + return true + } + } + return false +} + func blockDirs(dir string) ([]string, error) { files, err := ioutil.ReadDir(dir) if err != nil { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 8d175f256..e400974aa 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2708,6 +2708,88 @@ func deleteNonBlocks(dbDir string) error { return errors.Errorf("root folder:%v still hase non block directory:%v", dbDir, dir.Name()) } } - return nil } + +func TestOpen_VariousBlockStates(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }) + + var ( + expectedLoadedDirs = map[string]struct{}{} + expectedRemovedDirs = map[string]struct{}{} + expectedIgnoredDirs = map[string]struct{}{} + ) + + { + // Ok blocks; should be loaded. + expectedLoadedDirs[createBlock(t, tmpDir, genSeries(10, 2, 0, 10))] = struct{}{} + expectedLoadedDirs[createBlock(t, tmpDir, genSeries(10, 2, 10, 20))] = struct{}{} + } + { + // Block to repair; should be repaired & loaded. + dbDir := filepath.Join("testdata", "repair_index_version", "01BZJ9WJQPWHGNC2W4J9TA62KC") + outDir := filepath.Join(tmpDir, "01BZJ9WJQPWHGNC2W4J9TA62KC") + expectedLoadedDirs[outDir] = struct{}{} + + // Touch chunks dir in block. + testutil.Ok(t, os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777)) + defer func() { + testutil.Ok(t, os.RemoveAll(filepath.Join(dbDir, "chunks"))) + }() + testutil.Ok(t, os.Mkdir(outDir, os.ModePerm)) + testutil.Ok(t, fileutil.CopyDirs(dbDir, outDir)) + } + { + // Missing meta.json; should be ignored and only logged. + // TODO(bwplotka): Probably add metric. + dir := createBlock(t, tmpDir, genSeries(10, 2, 20, 30)) + expectedIgnoredDirs[dir] = struct{}{} + testutil.Ok(t, os.Remove(filepath.Join(dir, metaFilename))) + } + { + // Tmp blocks during creation & deletion; those should be removed on start. + dir := createBlock(t, tmpDir, genSeries(10, 2, 30, 40)) + testutil.Ok(t, fileutil.Replace(dir, dir+tmpForCreationBlockDirSuffix)) + expectedRemovedDirs[dir+tmpForCreationBlockDirSuffix] = struct{}{} + + // Tmp blocks during creation & deletion; those should be removed on start. + dir = createBlock(t, tmpDir, genSeries(10, 2, 40, 50)) + testutil.Ok(t, fileutil.Replace(dir, dir+tmpForDeletionBlockDirSuffix)) + expectedRemovedDirs[dir+tmpForDeletionBlockDirSuffix] = struct{}{} + } + + opts := DefaultOptions() + opts.RetentionDuration = 0 + db, err := Open(tmpDir, log.NewLogfmtLogger(os.Stderr), nil, opts) + testutil.Ok(t, err) + + loadedBlocks := db.Blocks() + + var loaded int + for _, l := range loadedBlocks { + if _, ok := expectedLoadedDirs[filepath.Join(tmpDir, l.meta.ULID.String())]; !ok { + t.Fatal("unexpected block", l.meta.ULID, "was loaded") + } + loaded++ + } + testutil.Equals(t, len(expectedLoadedDirs), loaded) + testutil.Ok(t, db.Close()) + + files, err := ioutil.ReadDir(tmpDir) + testutil.Ok(t, err) + + var ignored int + for _, f := range files { + if _, ok := expectedRemovedDirs[filepath.Join(tmpDir, f.Name())]; ok { + t.Fatal("expected", filepath.Join(tmpDir, f.Name()), "to be removed, but still exists") + } + if _, ok := expectedIgnoredDirs[filepath.Join(tmpDir, f.Name())]; ok { + ignored++ + } + } + testutil.Equals(t, len(expectedIgnoredDirs), ignored) +} diff --git a/tsdb/repair.go b/tsdb/repair.go index efc81967e..02114cd4f 100644 --- a/tsdb/repair.go +++ b/tsdb/repair.go @@ -37,10 +37,6 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { return errors.Wrapf(err, "list block dirs in %q", dir) } - wrapErr := func(err error, d string) error { - return errors.Wrapf(err, "block dir: %q", d) - } - tmpFiles := make([]string, 0, len(dirs)) defer func() { for _, tmp := range tmpFiles { @@ -53,7 +49,8 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { for _, d := range dirs { meta, err := readBogusMetaFile(d) if err != nil { - return wrapErr(err, d) + level.Error(logger).Log("msg", "failed to read meta.json for a block during repair process; skipping", "dir", d, "err", err) + continue } if meta.Version == metaVersion1 { level.Info(logger).Log( @@ -73,44 +70,44 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { repl, err := os.Create(filepath.Join(d, "index.repaired")) if err != nil { - return wrapErr(err, d) + return errors.Wrapf(err, "create index.repaired for block dir: %v", d) } tmpFiles = append(tmpFiles, repl.Name()) broken, err := os.Open(filepath.Join(d, indexFilename)) if err != nil { - return wrapErr(err, d) + return errors.Wrapf(err, "open broken index for block dir: %v", d) } if _, err := io.Copy(repl, broken); err != nil { - return wrapErr(err, d) + return errors.Wrapf(err, "copy content of index to index.repaired for block dir: %v", d) } var merr tsdb_errors.MultiError // Set the 5th byte to 2 to indicate the correct file format version. if _, err := repl.WriteAt([]byte{2}, 4); err != nil { - merr.Add(wrapErr(err, d)) - merr.Add(wrapErr(repl.Close(), d)) - return merr.Err() + merr.Add(errors.Wrap(err, "rewrite of index.repaired")) + merr.Add(errors.Wrap(repl.Close(), "close")) + return errors.Wrapf(merr.Err(), "block dir: %v", d) } if err := repl.Sync(); err != nil { - merr.Add(wrapErr(err, d)) - merr.Add(wrapErr(repl.Close(), d)) - return merr.Err() + merr.Add(errors.Wrap(err, "sync of index.repaired")) + merr.Add(errors.Wrap(repl.Close(), "close")) + return errors.Wrapf(merr.Err(), "block dir: %v", d) } if err := repl.Close(); err != nil { - return wrapErr(err, d) + return errors.Wrapf(repl.Close(), "close repaired index for block dir: %v", d) } if err := broken.Close(); err != nil { - return wrapErr(err, d) + return errors.Wrapf(repl.Close(), "close broken index for block dir: %v", d) } if err := fileutil.Replace(repl.Name(), broken.Name()); err != nil { - return wrapErr(err, d) + return errors.Wrapf(repl.Close(), "replaced broken index with index.repaired for block dir: %v", d) } // Reset version of meta.json to 1. meta.Version = metaVersion1 if _, err := writeMetaFile(logger, d, meta); err != nil { - return wrapErr(err, d) + return errors.Wrapf(repl.Close(), "write meta for block dir: %v", d) } } return nil diff --git a/tsdb/repair_test.go b/tsdb/repair_test.go index 596e357eb..d789d2a5b 100644 --- a/tsdb/repair_test.go +++ b/tsdb/repair_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "io/ioutil" "os" "path/filepath" "testing" @@ -26,6 +27,12 @@ import ( ) func TestRepairBadIndexVersion(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }) + // The broken index used in this test was written by the following script // at a broken revision. // @@ -59,22 +66,21 @@ func TestRepairBadIndexVersion(t *testing.T) { // panic(err) // } // } - dbDir := filepath.Join("testdata", "repair_index_version", "01BZJ9WJQPWHGNC2W4J9TA62KC") - tmpDir := filepath.Join("testdata", "repair_index_version", "copy") - tmpDbDir := filepath.Join(tmpDir, "3MCNSQ8S31EHGJYWK5E1GPJWJZ") + tmpDbDir := filepath.Join(tmpDir, "01BZJ9WJQPWHGNC2W4J9TA62KC") + + // Create a copy DB to run test against. + testutil.Ok(t, fileutil.CopyDirs(filepath.Join("testdata", "repair_index_version", "01BZJ9WJQPWHGNC2W4J9TA62KC"), tmpDbDir)) // Check the current db. // In its current state, lookups should fail with the fixed code. - _, _, err := readMetaFile(dbDir) + _, _, err = readMetaFile(tmpDbDir) testutil.NotOk(t, err) - // Touch chunks dir in block. - testutil.Ok(t, os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777)) - defer func() { - testutil.Ok(t, os.RemoveAll(filepath.Join(dbDir, "chunks"))) - }() + // Touch chunks dir in block to imitate them. + testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDbDir, "chunks"), 0777)) - r, err := index.NewFileReader(filepath.Join(dbDir, indexFilename)) + // Read current index to check integrity. + r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) testutil.Ok(t, err) p, err := r.Postings("b", "1") testutil.Ok(t, err) @@ -87,13 +93,6 @@ func TestRepairBadIndexVersion(t *testing.T) { testutil.Ok(t, p.Err()) testutil.Ok(t, r.Close()) - // Create a copy DB to run test against. - if err = fileutil.CopyDirs(dbDir, tmpDbDir); err != nil { - t.Fatal(err) - } - defer func() { - testutil.Ok(t, os.RemoveAll(tmpDir)) - }() // On DB opening all blocks in the base dir should be repaired. db, err := Open(tmpDir, nil, nil, nil) testutil.Ok(t, err) diff --git a/tsdb/testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/meta.json b/tsdb/testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/meta.json index 4c477916c..c358a4398 100644 --- a/tsdb/testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/meta.json +++ b/tsdb/testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/meta.json @@ -1,6 +1,6 @@ { "version": 2, - "ulid": "01BZJ9WJR6Z192734YNMD62F6M", + "ulid": "01BZJ9WJQPWHGNC2W4J9TA62KC", "minTime": 1511366400000, "maxTime": 1511368200000, "stats": { @@ -11,7 +11,7 @@ "compaction": { "level": 1, "sources": [ - "01BZJ9WJR6Z192734YNMD62F6M" + "01BZJ9WJQPWHGNC2W4J9TA62KC" ] } }