Fix filehandling for windows (#392)

* Fix filehandling for windows

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Fix more windows filehandling issues

Windows: Close files before deleting Checkpoints.

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

Windows: Close writers in case of errors so they can be deleted

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

Windows: Close block so that it can be deleted.

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

Windows: Close file to delete it

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

Windows: Close dir so that it can be deleted.

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

Windows: close files so that they can be deleted.

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Review feedback

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
This commit is contained in:
Goutham Veeramachaneni 2018-09-21 11:01:22 +05:30 committed by GitHub
parent 5ae6c60d39
commit 9c8ca47399
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 61 additions and 5 deletions

View file

@ -109,6 +109,10 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
stats := &CheckpointStats{} stats := &CheckpointStats{}
var sr io.Reader var sr io.Reader
// We close everything explicitly because Windows needs files to be
// closed before being deleted. But we also have defer so that we close
// files if there is an error somewhere.
var closers []io.Closer
{ {
lastFn, k, err := LastCheckpoint(w.Dir()) lastFn, k, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound { if err != nil && err != ErrNotFound {
@ -126,6 +130,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
return nil, errors.Wrap(err, "open last checkpoint") return nil, errors.Wrap(err, "open last checkpoint")
} }
defer last.Close() defer last.Close()
closers = append(closers, last)
sr = last sr = last
} }
@ -134,6 +139,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
return nil, errors.Wrap(err, "create segment reader") return nil, errors.Wrap(err, "create segment reader")
} }
defer segsr.Close() defer segsr.Close()
closers = append(closers, segsr)
if sr != nil { if sr != nil {
sr = io.MultiReader(sr, segsr) sr = io.MultiReader(sr, segsr)
@ -263,6 +269,9 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint directory") return nil, errors.Wrap(err, "rename checkpoint directory")
} }
if err := closeAll(closers...); err != nil {
return stats, errors.Wrap(err, "close opened files")
}
if err := w.Truncate(n + 1); err != nil { if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint. // If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint // Leftover segments will just be ignored in the future if there's a checkpoint

View file

@ -452,6 +452,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil { if err != nil {
return errors.Wrap(err, "open chunk writer") return errors.Wrap(err, "open chunk writer")
} }
defer chunkw.Close()
// Record written chunk sizes on level 1 compactions. // Record written chunk sizes on level 1 compactions.
if meta.Compaction.Level == 1 { if meta.Compaction.Level == 1 {
chunkw = &instrumentedChunkWriter{ chunkw = &instrumentedChunkWriter{
@ -466,6 +467,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil { if err != nil {
return errors.Wrap(err, "open index writer") return errors.Wrap(err, "open index writer")
} }
defer indexw.Close()
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction") return errors.Wrap(err, "write compaction")
@ -475,6 +477,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "write merged meta") return errors.Wrap(err, "write merged meta")
} }
// We are explicitly closing them here to check for error even
// though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to
// make sure they are closed if the function exits due to an error above.
if err = chunkw.Close(); err != nil { if err = chunkw.Close(); err != nil {
return errors.Wrap(err, "close chunk writer") return errors.Wrap(err, "close chunk writer")
} }

View file

@ -859,6 +859,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
} }
block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta) block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta)
testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
// Now check that all expected blocks are actually persisted on disk. // Now check that all expected blocks are actually persisted on disk.

View file

@ -48,7 +48,7 @@ func Rename(from, to string) error {
// It is not atomic. // It is not atomic.
func Replace(from, to string) error { func Replace(from, to string) error {
if err := os.RemoveAll(to); err != nil { if err := os.RemoveAll(to); err != nil {
return nil return err
} }
if err := os.Rename(from, to); err != nil { if err := os.Rename(from, to); err != nil {
return err return err

View file

@ -6,6 +6,7 @@ import (
"testing" "testing"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
@ -74,6 +75,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
if p.Err() != nil { if p.Err() != nil {
t.Fatal(err) t.Fatal(err)
} }
testutil.Ok(t, r.Close())
// On DB opening all blocks in the base dir should be repaired. // On DB opening all blocks in the base dir should be repaired.
db, err := Open("testdata/repair_index_version", nil, nil, nil) db, err := Open("testdata/repair_index_version", nil, nil, nil)

20
wal.go
View file

@ -723,6 +723,13 @@ func (w *SegmentWAL) run(interval time.Duration) {
// Close syncs all data and closes the underlying resources. // Close syncs all data and closes the underlying resources.
func (w *SegmentWAL) Close() error { func (w *SegmentWAL) Close() error {
// Make sure you can call Close() multiple times.
select {
case <-w.stopc:
return nil // Already closed.
default:
}
close(w.stopc) close(w.stopc)
<-w.donec <-w.donec
@ -735,10 +742,12 @@ func (w *SegmentWAL) Close() error {
// On opening, a WAL must be fully consumed once. Afterwards // On opening, a WAL must be fully consumed once. Afterwards
// only the current segment will still be open. // only the current segment will still be open.
if hf := w.head(); hf != nil { if hf := w.head(); hf != nil {
return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) if err := hf.Close(); err != nil {
return errors.Wrapf(err, "closing WAL head %s", hf.Name())
}
} }
return w.dirFile.Close() return errors.Wrapf(w.dirFile.Close(), "closing WAL dir %s", w.dirFile.Name())
} }
const ( const (
@ -1260,6 +1269,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
if err != nil { if err != nil {
return errors.Wrap(err, "open new WAL") return errors.Wrap(err, "open new WAL")
} }
// It should've already been closed as part of the previous finalization. // It should've already been closed as part of the previous finalization.
// Do it once again in case of prior errors. // Do it once again in case of prior errors.
defer func() { defer func() {
@ -1306,6 +1316,12 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
if err != nil { if err != nil {
return errors.Wrap(err, "write new entries") return errors.Wrap(err, "write new entries")
} }
// We explicitly close even when there is a defer for Windows to be
// able to delete it. The defer is in place to close it in-case there
// are errors above.
if err := w.Close(); err != nil {
return errors.Wrap(err, "close old WAL")
}
if err := repl.Close(); err != nil { if err := repl.Close(); err != nil {
return errors.Wrap(err, "close new WAL") return errors.Wrap(err, "close new WAL")
} }

View file

@ -285,6 +285,15 @@ func (w *WAL) Repair(origErr error) error {
if s.n <= cerr.Segment { if s.n <= cerr.Segment {
continue continue
} }
if w.segment.i == s.n {
// The active segment needs to be removed,
// close it first (Windows!). Can be closed safely
// as we set the current segment to repaired file
// below.
if err := w.segment.Close(); err != nil {
return errors.Wrap(err, "close active segment")
}
}
if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil {
return errors.Wrap(err, "delete segment") return errors.Wrap(err, "delete segment")
} }
@ -312,6 +321,7 @@ func (w *WAL) Repair(origErr error) error {
return errors.Wrap(err, "open segment") return errors.Wrap(err, "open segment")
} }
defer f.Close() defer f.Close()
r := NewReader(bufio.NewReader(f)) r := NewReader(bufio.NewReader(f))
for r.Next() { for r.Next() {
@ -319,8 +329,14 @@ func (w *WAL) Repair(origErr error) error {
return errors.Wrap(err, "insert record") return errors.Wrap(err, "insert record")
} }
} }
// We expect an error here, so nothing to handle. // We expect an error here from r.Err(), so nothing to handle.
// We explicitly close even when there is a defer for Windows to be
// able to delete it. The defer is in place to close it in-case there
// are errors above.
if err := f.Close(); err != nil {
return errors.Wrap(err, "close corrupted file")
}
if err := os.Remove(tmpfn); err != nil { if err := os.Remove(tmpfn); err != nil {
return errors.Wrap(err, "delete corrupted segment") return errors.Wrap(err, "delete corrupted segment")
} }

View file

@ -287,8 +287,14 @@ func TestWAL_Repair(t *testing.T) {
for r.Next() { for r.Next() {
} }
testutil.NotOk(t, r.Err()) testutil.NotOk(t, r.Err())
testutil.Ok(t, sr.Close())
testutil.Ok(t, w.Repair(r.Err())) testutil.Ok(t, w.Repair(r.Err()))
testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err"))) // See https://github.com/prometheus/prometheus/issues/4603
// See https://github.com/prometheus/prometheus/issues/4603
// We need to close w.segment because it needs to be deleted.
// But this is to mainly artificially test Repair() again.
testutil.Ok(t, w.segment.Close())
testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err")))
sr, err = NewSegmentsReader(dir) sr, err = NewSegmentsReader(dir)
testutil.Ok(t, err) testutil.Ok(t, err)