cleanup tmp files (#570)

Signed-off-by: zhulongcheng <zhulongcheng.me@gmail.com>
This commit is contained in:
zhulongcheng 2019-04-26 16:27:36 +08:00 committed by Krasi Georgiev
parent 288f67efbf
commit 19d402d154
7 changed files with 65 additions and 42 deletions

View file

@ -29,6 +29,7 @@ import (
"github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/tsdb/errors" tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
@ -230,12 +231,17 @@ func readMetaFile(dir string) (*BlockMeta, error) {
return &m, nil return &m, nil
} }
func writeMetaFile(dir string, meta *BlockMeta) error { func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error {
meta.Version = 1 meta.Version = 1
// Make any changes to the file appear atomic. // Make any changes to the file appear atomic.
path := filepath.Join(dir, metaFilename) path := filepath.Join(dir, metaFilename)
tmp := path + ".tmp" tmp := path + ".tmp"
defer func() {
if err := os.RemoveAll(tmp); err != nil {
level.Error(logger).Log("msg", "remove tmp file", "err", err.Error())
}
}()
f, err := os.Create(tmp) f, err := os.Create(tmp)
if err != nil { if err != nil {
@ -246,7 +252,6 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
enc.SetIndent("", "\t") enc.SetIndent("", "\t")
var merr tsdb_errors.MultiError var merr tsdb_errors.MultiError
if merr.Add(enc.Encode(meta)); merr.Err() != nil { if merr.Add(enc.Encode(meta)); merr.Err() != nil {
merr.Add(f.Close()) merr.Add(f.Close())
return merr.Err() return merr.Err()
@ -259,7 +264,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
return err return err
} }
return renameFile(tmp, path) return fileutil.Replace(tmp, path)
} }
// Block represents a directory of time series data covering a continuous time range. // Block represents a directory of time series data covering a continuous time range.
@ -278,6 +283,8 @@ type Block struct {
chunkr ChunkReader chunkr ChunkReader
indexr IndexReader indexr IndexReader
tombstones TombstoneReader tombstones TombstoneReader
logger log.Logger
} }
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
@ -322,7 +329,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
// that would be the logical place for a block size to be calculated. // that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr) bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs meta.Stats.NumBytes = bs
err = writeMetaFile(dir, meta) err = writeMetaFile(logger, dir, meta)
if err != nil { if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
} }
@ -334,6 +341,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
indexr: ir, indexr: ir,
tombstones: tr, tombstones: tr,
symbolTableSize: ir.SymbolTableSize(), symbolTableSize: ir.SymbolTableSize(),
logger: logger,
} }
return pb, nil return pb, nil
} }
@ -429,7 +437,7 @@ func (pb *Block) GetSymbolTableSize() uint64 {
func (pb *Block) setCompactionFailed() error { func (pb *Block) setCompactionFailed() error {
pb.meta.Compaction.Failed = true pb.meta.Compaction.Failed = true
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.logger, pb.dir, &pb.meta)
} }
type blockIndexReader struct { type blockIndexReader struct {
@ -553,10 +561,10 @@ Outer:
pb.tombstones = stones pb.tombstones = stones
pb.meta.Stats.NumTombstones = pb.tombstones.Total() pb.meta.Stats.NumTombstones = pb.tombstones.Total()
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { if err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil {
return err return err
} }
return writeMetaFile(pb.dir, &pb.meta) return writeMetaFile(pb.logger, pb.dir, &pb.meta)
} }
// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). // CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).

View file

@ -39,7 +39,7 @@ func TestBlockMetaMustNeverBeVersion2(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
testutil.Ok(t, writeMetaFile(dir, &BlockMeta{})) testutil.Ok(t, writeMetaFile(log.NewNopLogger(), dir, &BlockMeta{}))
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
testutil.Ok(t, err) testutil.Ok(t, err)

View file

@ -426,7 +426,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
if meta.Stats.NumSamples == 0 { if meta.Stats.NumSamples == 0 {
for _, b := range bs { for _, b := range bs {
b.meta.Compaction.Deletable = true b.meta.Compaction.Deletable = true
if err = writeMetaFile(b.dir, &b.meta); err != nil { if err = writeMetaFile(c.logger, b.dir, &b.meta); err != nil {
level.Error(c.logger).Log( level.Error(c.logger).Log(
"msg", "Failed to write 'Deletable' to meta file after compaction", "msg", "Failed to write 'Deletable' to meta file after compaction",
"ulid", b.meta.ULID, "ulid", b.meta.ULID,
@ -609,12 +609,12 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return nil return nil
} }
if err = writeMetaFile(tmp, meta); err != nil { if err = writeMetaFile(c.logger, tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta") return errors.Wrap(err, "write merged meta")
} }
// Create an empty tombstones file. // Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { if err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file") return errors.Wrap(err, "write new tombstones file")
} }
@ -639,7 +639,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
df = nil df = nil
// Block successfully written, make visible and remove old ones. // Block successfully written, make visible and remove old ones.
if err := renameFile(tmp, dir); err != nil { if err := fileutil.Replace(tmp, dir); err != nil {
return errors.Wrap(err, "rename block dir") return errors.Wrap(err, "rename block dir")
} }
@ -1013,24 +1013,3 @@ func (c *compactionMerger) Err() error {
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) { func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) {
return c.l, c.c, c.intervals return c.l, c.c, c.intervals
} }
func renameFile(from, to string) error {
if err := os.RemoveAll(to); err != nil {
return err
}
if err := os.Rename(from, to); err != nil {
return err
}
// Directory was renamed; sync parent dir to persist rename.
pdir, err := fileutil.OpenDir(filepath.Dir(to))
if err != nil {
return err
}
if err = pdir.Sync(); err != nil {
pdir.Close()
return err
}
return pdir.Close()
}

View file

@ -23,6 +23,8 @@ import (
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/fileutil"
) )
// repairBadIndexVersion repairs an issue in index and meta.json persistence introduced in // repairBadIndexVersion repairs an issue in index and meta.json persistence introduced in
@ -38,6 +40,16 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
wrapErr := func(err error, d string) error { wrapErr := func(err error, d string) error {
return errors.Wrapf(err, "block dir: %q", d) return errors.Wrapf(err, "block dir: %q", d)
} }
tmpFiles := make([]string, 0, len(dir))
defer func() {
for _, tmp := range tmpFiles {
if err := os.RemoveAll(tmp); err != nil {
level.Error(logger).Log("msg", "remove tmp file", "err", err.Error())
}
}
}()
for _, d := range dirs { for _, d := range dirs {
meta, err := readBogusMetaFile(d) meta, err := readBogusMetaFile(d)
if err != nil { if err != nil {
@ -63,6 +75,8 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
if err != nil { if err != nil {
return wrapErr(err, d) return wrapErr(err, d)
} }
tmpFiles = append(tmpFiles, repl.Name())
broken, err := os.Open(filepath.Join(d, indexFilename)) broken, err := os.Open(filepath.Join(d, indexFilename))
if err != nil { if err != nil {
return wrapErr(err, d) return wrapErr(err, d)
@ -70,12 +84,19 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
if _, err := io.Copy(repl, broken); err != nil { if _, err := io.Copy(repl, broken); err != nil {
return wrapErr(err, d) return wrapErr(err, d)
} }
var merr tsdb_errors.MultiError
// Set the 5th byte to 2 to indicate the correct file format version. // Set the 5th byte to 2 to indicate the correct file format version.
if _, err := repl.WriteAt([]byte{2}, 4); err != nil { if _, err := repl.WriteAt([]byte{2}, 4); err != nil {
return wrapErr(err, d) merr.Add(wrapErr(err, d))
merr.Add(wrapErr(repl.Close(), d))
return merr.Err()
} }
if err := repl.Sync(); err != nil { if err := repl.Sync(); err != nil {
return wrapErr(err, d) merr.Add(wrapErr(err, d))
merr.Add(wrapErr(repl.Close(), d))
return merr.Err()
} }
if err := repl.Close(); err != nil { if err := repl.Close(); err != nil {
return wrapErr(err, d) return wrapErr(err, d)
@ -83,12 +104,12 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
if err := broken.Close(); err != nil { if err := broken.Close(); err != nil {
return wrapErr(err, d) return wrapErr(err, d)
} }
if err := renameFile(repl.Name(), broken.Name()); err != nil { if err := fileutil.Replace(repl.Name(), broken.Name()); err != nil {
return wrapErr(err, d) return wrapErr(err, d)
} }
// Reset version of meta.json to 1. // Reset version of meta.json to 1.
meta.Version = 1 meta.Version = 1
if err := writeMetaFile(d, meta); err != nil { if err := writeMetaFile(logger, d, meta); err != nil {
return wrapErr(err, d) return wrapErr(err, d)
} }
} }

View file

@ -22,9 +22,12 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/encoding"
tsdb_errors "github.com/prometheus/tsdb/errors" tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/fileutil"
) )
const tombstoneFilename = "tombstones" const tombstoneFilename = "tombstones"
@ -51,7 +54,7 @@ type TombstoneReader interface {
Close() error Close() error
} }
func writeTombstoneFile(dir string, tr TombstoneReader) error { func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error {
path := filepath.Join(dir, tombstoneFilename) path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp" tmp := path + ".tmp"
hash := newCRC32() hash := newCRC32()
@ -62,7 +65,12 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error {
} }
defer func() { defer func() {
if f != nil { if f != nil {
f.Close() if err := f.Close(); err != nil {
level.Error(logger).Log("msg", "close tmp file", "err", err.Error())
}
}
if err := os.RemoveAll(tmp); err != nil {
level.Error(logger).Log("msg", "remove tmp file", "err", err.Error())
} }
}() }()
@ -111,7 +119,7 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error {
return err return err
} }
f = nil f = nil
return renameFile(tmp, path) return fileutil.Replace(tmp, path)
} }
// Stone holds the information on the posting and time-range // Stone holds the information on the posting and time-range

View file

@ -21,6 +21,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
) )
@ -46,7 +47,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
stones.addInterval(ref, dranges...) stones.addInterval(ref, dranges...)
} }
testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) testutil.Ok(t, writeTombstoneFile(log.NewNopLogger(), tmpdir, stones))
restr, _, err := readTombstones(tmpdir) restr, _, err := readTombstones(tmpdir)
testutil.Ok(t, err) testutil.Ok(t, err)

8
wal.go
View file

@ -338,6 +338,12 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
if err != nil { if err != nil {
return errors.Wrap(err, "create compaction segment") return errors.Wrap(err, "create compaction segment")
} }
defer func() {
if err := os.RemoveAll(f.Name()); err != nil {
level.Error(w.logger).Log("msg", "remove tmp file", "err", err.Error())
}
}()
var ( var (
csf = newSegmentFile(f) csf = newSegmentFile(f)
crc32 = newCRC32() crc32 = newCRC32()
@ -389,7 +395,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
csf.Close() csf.Close()
candidates[0].Close() // need close before remove on platform windows candidates[0].Close() // need close before remove on platform windows
if err := renameFile(csf.Name(), candidates[0].Name()); err != nil { if err := fileutil.Replace(csf.Name(), candidates[0].Name()); err != nil {
return errors.Wrap(err, "rename compaction segment") return errors.Wrap(err, "rename compaction segment")
} }
for _, f := range candidates[1:] { for _, f := range candidates[1:] {