diff --git a/checkpoint.go b/checkpoint.go index d988d3561..f45f3791f 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -109,6 +109,10 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo stats := &CheckpointStats{} var sr io.Reader + // We close everything explicitly because Windows needs files to be + // closed before being deleted. But we also have defer so that we close + // files if there is an error somewhere. + var closers []io.Closer { lastFn, k, err := LastCheckpoint(w.Dir()) if err != nil && err != ErrNotFound { @@ -126,6 +130,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo return nil, errors.Wrap(err, "open last checkpoint") } defer last.Close() + closers = append(closers, last) sr = last } @@ -134,6 +139,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo return nil, errors.Wrap(err, "create segment reader") } defer segsr.Close() + closers = append(closers, segsr) if sr != nil { sr = io.MultiReader(sr, segsr) @@ -263,6 +269,9 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { return nil, errors.Wrap(err, "rename checkpoint directory") } + if err := closeAll(closers...); err != nil { + return stats, errors.Wrap(err, "close opened files") + } if err := w.Truncate(n + 1); err != nil { // If truncating fails, we'll just try again at the next checkpoint. // Leftover segments will just be ignored in the future if there's a checkpoint diff --git a/compact.go b/compact.go index 6df33a4c8..3f5fa367c 100644 --- a/compact.go +++ b/compact.go @@ -452,6 +452,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open chunk writer") } + defer chunkw.Close() // Record written chunk sizes on level 1 compactions. if meta.Compaction.Level == 1 { chunkw = &instrumentedChunkWriter{ @@ -466,6 +467,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open index writer") } + defer indexw.Close() if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") @@ -475,6 +477,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "write merged meta") } + // We are explicitly closing them here to check for error even + // though these are covered under defer. This is because in Windows, + // you cannot delete these unless they are closed and the defer is to + // make sure they are closed if the function exits due to an error above. if err = chunkw.Close(); err != nil { return errors.Wrap(err, "close chunk writer") } diff --git a/db_test.go b/db_test.go index 9c175118e..988fdb7ba 100644 --- a/db_test.go +++ b/db_test.go @@ -859,6 +859,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 } block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta) + testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) // Now check that all expected blocks are actually persisted on disk. diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index 2158bfd26..1154e7307 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -48,7 +48,7 @@ func Rename(from, to string) error { // It is not atomic. func Replace(from, to string) error { if err := os.RemoveAll(to); err != nil { - return nil + return err } if err := os.Rename(from, to); err != nil { return err diff --git a/repair_test.go b/repair_test.go index c80976002..cbe138c0a 100644 --- a/repair_test.go +++ b/repair_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" @@ -74,6 +75,7 @@ func TestRepairBadIndexVersion(t *testing.T) { if p.Err() != nil { t.Fatal(err) } + testutil.Ok(t, r.Close()) // On DB opening all blocks in the base dir should be repaired. db, err := Open("testdata/repair_index_version", nil, nil, nil) diff --git a/wal.go b/wal.go index ff978766c..59206d8d0 100644 --- a/wal.go +++ b/wal.go @@ -723,6 +723,13 @@ func (w *SegmentWAL) run(interval time.Duration) { // Close syncs all data and closes the underlying resources. func (w *SegmentWAL) Close() error { + // Make sure you can call Close() multiple times. + select { + case <-w.stopc: + return nil // Already closed. + default: + } + close(w.stopc) <-w.donec @@ -735,10 +742,12 @@ func (w *SegmentWAL) Close() error { // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. if hf := w.head(); hf != nil { - return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) + if err := hf.Close(); err != nil { + return errors.Wrapf(err, "closing WAL head %s", hf.Name()) + } } - return w.dirFile.Close() + return errors.Wrapf(w.dirFile.Close(), "closing WAL dir %s", w.dirFile.Name()) } const ( @@ -1260,6 +1269,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err != nil { return errors.Wrap(err, "open new WAL") } + // It should've already been closed as part of the previous finalization. // Do it once again in case of prior errors. defer func() { @@ -1306,6 +1316,12 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err != nil { return errors.Wrap(err, "write new entries") } + // We explicitly close even when there is a defer for Windows to be + // able to delete it. The defer is in place to close it in-case there + // are errors above. + if err := w.Close(); err != nil { + return errors.Wrap(err, "close old WAL") + } if err := repl.Close(); err != nil { return errors.Wrap(err, "close new WAL") } diff --git a/wal/wal.go b/wal/wal.go index 1aea24d86..aa52738fa 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -285,6 +285,15 @@ func (w *WAL) Repair(origErr error) error { if s.n <= cerr.Segment { continue } + if w.segment.i == s.n { + // The active segment needs to be removed, + // close it first (Windows!). Can be closed safely + // as we set the current segment to repaired file + // below. + if err := w.segment.Close(); err != nil { + return errors.Wrap(err, "close active segment") + } + } if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { return errors.Wrap(err, "delete segment") } @@ -312,6 +321,7 @@ func (w *WAL) Repair(origErr error) error { return errors.Wrap(err, "open segment") } defer f.Close() + r := NewReader(bufio.NewReader(f)) for r.Next() { @@ -319,8 +329,14 @@ func (w *WAL) Repair(origErr error) error { return errors.Wrap(err, "insert record") } } - // We expect an error here, so nothing to handle. + // We expect an error here from r.Err(), so nothing to handle. + // We explicitly close even when there is a defer for Windows to be + // able to delete it. The defer is in place to close it in-case there + // are errors above. + if err := f.Close(); err != nil { + return errors.Wrap(err, "close corrupted file") + } if err := os.Remove(tmpfn); err != nil { return errors.Wrap(err, "delete corrupted segment") } diff --git a/wal/wal_test.go b/wal/wal_test.go index 26ee8663d..72f46253f 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -287,8 +287,14 @@ func TestWAL_Repair(t *testing.T) { for r.Next() { } testutil.NotOk(t, r.Err()) + testutil.Ok(t, sr.Close()) testutil.Ok(t, w.Repair(r.Err())) - testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err"))) // See https://github.com/prometheus/prometheus/issues/4603 + + // See https://github.com/prometheus/prometheus/issues/4603 + // We need to close w.segment because it needs to be deleted. + // But this is to mainly artificially test Repair() again. + testutil.Ok(t, w.segment.Close()) + testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err"))) sr, err = NewSegmentsReader(dir) testutil.Ok(t, err)