diff --git a/wal.go b/wal.go index b06425e076..2a49dcf41e 100644 --- a/wal.go +++ b/wal.go @@ -63,7 +63,7 @@ const ( // OpenWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. -func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error) { +func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) { dir = filepath.Join(dir, walDirName) if err := os.MkdirAll(dir, 0777); err != nil { @@ -73,10 +73,13 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error if err != nil { return nil, err } + if logger == nil { + logger = log.NewNopLogger() + } w := &WAL{ dirFile: df, - logger: l, + logger: logger, flushInterval: flushInterval, donec: make(chan struct{}), stopc: make(chan struct{}), @@ -95,11 +98,7 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. func (w *WAL) Reader() *WALReader { - var rs []io.ReadCloser - for _, f := range w.files { - rs = append(rs, f) - } - return NewWALReader(rs...) + return NewWALReader(w.logger, w) } // Log writes a batch of new series labels and samples to the log. @@ -126,21 +125,15 @@ func (w *WAL) initSegments() error { if len(fns) == 0 { return nil } - if len(fns) > 1 { - for _, fn := range fns[:len(fns)-1] { - f, err := os.Open(fn) - if err != nil { - return err - } - w.files = append(w.files, f) + // We must open all file in read mode as we may have to truncate along + // the way and any file may become the tail. + for _, fn := range fns { + f, err := os.OpenFile(fn, os.O_RDWR, 0666) + if err != nil { + return err } + w.files = append(w.files, f) } - // The most recent WAL file is the one we have to keep appending to. - f, err := os.OpenFile(fns[len(fns)-1], os.O_RDWR, 0666) - if err != nil { - return err - } - w.files = append(w.files, f) // Consume and validate meta headers. for _, f := range w.files { @@ -275,7 +268,7 @@ func (w *WAL) Close() error { // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. if tf := w.tail(); tf != nil { - return tf.Close() + return errors.Wrapf(tf.Close(), "closing WAL tail %s", tf.Name()) } return nil } @@ -413,7 +406,9 @@ func (w *WAL) encodeSamples(samples []refdSample) error { // WALReader decodes and emits write ahead log entries. type WALReader struct { - rs []io.ReadCloser + logger log.Logger + + wal *WAL cur int buf []byte crc32 hash.Hash32 @@ -424,12 +419,17 @@ type WALReader struct { } // NewWALReader returns a new WALReader over the sequence of the given ReadClosers. -func NewWALReader(rs ...io.ReadCloser) *WALReader { - return &WALReader{ - rs: rs, - buf: make([]byte, 0, 128*4096), - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), +func NewWALReader(logger log.Logger, w *WAL) *WALReader { + if logger == nil { + logger = log.NewNopLogger() } + r := &WALReader{ + logger: logger, + wal: w, + buf: make([]byte, 0, 128*4096), + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), + } + return r } // At returns the last decoded entry of labels or samples. @@ -446,19 +446,18 @@ func (r *WALReader) Err() error { // nextEntry retrieves the next entry. It is also used as a testing hook. func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { - if r.cur >= len(r.rs) { + if r.cur >= len(r.wal.files) { return 0, 0, nil, io.EOF } - cr := r.rs[r.cur] + cf := r.wal.files[r.cur] - et, flag, b, err := r.entry(cr) + et, flag, b, err := r.entry(cf) // If we reached the end of the reader, advance to the next one // and close. // Do not close on the last one as it will still be appended to. - // XXX(fabxc): leaky abstraction. - if err == io.EOF && r.cur < len(r.rs)-1 { + if err == io.EOF && r.cur < len(r.wal.files)-1 { // Current reader completed, close and move to the next one. - if err := cr.Close(); err != nil { + if err := cf.Close(); err != nil { return 0, 0, nil, err } r.cur++ @@ -473,14 +472,46 @@ func (r *WALReader) Next() bool { r.labels = r.labels[:0] r.samples = r.samples[:0] - et, flag, b, err := r.nextEntry() + if r.cur >= len(r.wal.files) { + return false + } + cf := r.wal.files[r.cur] + + // Save position after last valid entry if we have to truncate the WAL. + lastOffset, err := cf.Seek(0, os.SEEK_CUR) if err != nil { - if err != io.EOF { + r.err = err + return false + } + + et, flag, b, err := r.entry(cf) + // If we reached the end of the reader, advance to the next one + // and close. + // Do not close on the last one as it will still be appended to. + if err == io.EOF { + if r.cur == len(r.wal.files)-1 { + return false + } + // Current reader completed, close and move to the next one. + if err := cf.Close(); err != nil { r.err = err + return false + } + r.cur++ + return r.Next() + } + if err != nil { + r.err = err + + if _, ok := err.(walCorruptionErr); ok { + r.err = r.truncate(lastOffset) } return false } + // In decoding below we never return a walCorruptionErr for now. + // Those should generally be catched by entry decoding before. + switch et { case WALEntrySamples: if err := r.decodeSamples(flag, b); err != nil { @@ -490,19 +521,52 @@ func (r *WALReader) Next() bool { if err := r.decodeSeries(flag, b); err != nil { r.err = err } - default: - r.err = errors.Errorf("unknown WAL entry type %d", et) } return r.err == nil } +func (r *WALReader) current() *os.File { + return r.wal.files[r.cur] +} + +// truncate the WAL after the last valid entry. +func (r *WALReader) truncate(lastOffset int64) error { + r.logger.Log("msg", "WAL corruption detected; truncating", + "err", r.err, "file", r.current().Name(), "pos", lastOffset) + + // Close and delete all files after the current one. + for _, f := range r.wal.files[r.cur+1:] { + if err := f.Close(); err != nil { + return err + } + if err := os.Remove(f.Name()); err != nil { + return err + } + } + r.wal.files = r.wal.files[:r.cur+1] + + // Seek the current file to the last valid offset where we continue writing from. + _, err := r.current().Seek(lastOffset, os.SEEK_SET) + return err +} + +// walCorruptionErr is a type wrapper for errors that indicate WAL corruption +// and trigger a truncation. +type walCorruptionErr error + +func walCorruptionErrf(s string, args ...interface{}) error { + return walCorruptionErr(errors.Errorf(s, args...)) +} + func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { r.crc32.Reset() tr := io.TeeReader(cr, r.crc32) b := make([]byte, 6) - if _, err := tr.Read(b); err != nil { + if n, err := tr.Read(b); err != nil { return 0, 0, nil, err + } else if n != 6 { + return 0, 0, nil, walCorruptionErrf("invalid entry header size %d", n) } var ( @@ -514,21 +578,28 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { if etype == 0 { return 0, 0, nil, io.EOF } + if etype != WALEntrySeries && etype != WALEntrySamples { + return 0, 0, nil, walCorruptionErrf("invalid entry type %d", etype) + } if length > len(r.buf) { r.buf = make([]byte, length) } buf := r.buf[:length] - if _, err := tr.Read(buf); err != nil { + if n, err := tr.Read(buf); err != nil { return 0, 0, nil, err + } else if n != length { + return 0, 0, nil, walCorruptionErrf("invalid entry body size %d", n) } - _, err := cr.Read(b[:4]) - if err != nil { + + if n, err := cr.Read(b[:4]); err != nil { return 0, 0, nil, err + } else if n != 4 { + return 0, 0, nil, walCorruptionErrf("invalid checksum length %d", n) } if exp, has := binary.BigEndian.Uint32(b[:4]), r.crc32.Sum32(); has != exp { - return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) + return 0, 0, nil, walCorruptionErrf("unexpected CRC32 checksum %x, want %x", has, exp) } return etype, flag, buf, nil diff --git a/wal_test.go b/wal_test.go index 8442dbddc7..6a4205c1c4 100644 --- a/wal_test.go +++ b/wal_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/fabxc/tsdb/labels" + "github.com/go-kit/kit/log" "github.com/coreos/etcd/pkg/fileutil" "github.com/stretchr/testify/require" @@ -109,7 +110,7 @@ func TestWAL_cut(t *testing.T) { // We cannot actually check for correct pre-allocation as it is // optional per filesystem and handled transparently. - et, flag, b, err := NewWALReader(f).nextEntry() + et, flag, b, err := NewWALReader(nil, nil).entry(f) require.NoError(t, err) require.Equal(t, WALEntrySeries, et) require.Equal(t, flag, byte(walSeriesSimple)) @@ -204,3 +205,139 @@ func TestWAL_Log_Restore(t *testing.T) { require.NoError(t, w.Close()) } } + +// Test reading from a WAL that has been corrupted through various means. +func TestWALRestoreCorrupted(t *testing.T) { + cases := []struct { + name string + f func(*testing.T, *WAL) + }{ + { + name: "truncate_checksum", + f: func(t *testing.T, w *WAL) { + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + require.NoError(t, err) + defer f.Close() + + off, err := f.Seek(0, os.SEEK_END) + require.NoError(t, err) + + require.NoError(t, f.Truncate(off-1)) + }, + }, + { + name: "truncate_body", + f: func(t *testing.T, w *WAL) { + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + require.NoError(t, err) + defer f.Close() + + off, err := f.Seek(0, os.SEEK_END) + require.NoError(t, err) + + require.NoError(t, f.Truncate(off-8)) + }, + }, + { + name: "body_content", + f: func(t *testing.T, w *WAL) { + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + require.NoError(t, err) + defer f.Close() + + off, err := f.Seek(0, os.SEEK_END) + require.NoError(t, err) + + // Write junk before checksum starts. + _, err = f.WriteAt([]byte{1, 2, 3, 4}, off-8) + require.NoError(t, err) + }, + }, + { + name: "checksum", + f: func(t *testing.T, w *WAL) { + f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) + require.NoError(t, err) + defer f.Close() + + off, err := f.Seek(0, os.SEEK_END) + require.NoError(t, err) + + // Write junk into checksum + _, err = f.WriteAt([]byte{1, 2, 3, 4}, off-4) + require.NoError(t, err) + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Generate testing data. It does not make semantical sense but + // for the purpose of this test. + dir, err := ioutil.TempDir("", "test_corrupted_checksum") + require.NoError(t, err) + defer os.RemoveAll(dir) + + w, err := OpenWAL(dir, nil, 0) + require.NoError(t, err) + + require.NoError(t, w.Log(nil, []refdSample{{t: 1, v: 2}})) + require.NoError(t, w.Log(nil, []refdSample{{t: 2, v: 3}})) + + require.NoError(t, w.cut()) + + require.NoError(t, w.Log(nil, []refdSample{{t: 3, v: 4}})) + require.NoError(t, w.Log(nil, []refdSample{{t: 5, v: 6}})) + + require.NoError(t, w.Close()) + + // Corrupt the second entry in the first file. + // After re-opening we must be able to read the first entry + // and the rest, including the second file, must be truncated for clean further + // writes. + c.f(t, w) + + logger := log.NewLogfmtLogger(os.Stderr) + + w2, err := OpenWAL(dir, logger, 0) + require.NoError(t, err) + + r := w2.Reader() + + require.True(t, r.Next()) + l, s := r.At() + require.Equal(t, 0, len(l)) + require.Equal(t, []refdSample{{t: 1, v: 2}}, s) + + // Truncation should happen transparently and now cause an error. + require.False(t, r.Next()) + require.Nil(t, r.Err()) + + require.NoError(t, w2.Log(nil, []refdSample{{t: 99, v: 100}})) + require.NoError(t, w2.Close()) + + files, err := fileutil.ReadDir(dir) + require.NoError(t, err) + require.Equal(t, 1, len(files)) + + // We should see the first valid entry and the new one, everything after + // is truncated. + w3, err := OpenWAL(dir, logger, 0) + require.NoError(t, err) + + r = w3.Reader() + + require.True(t, r.Next()) + l, s = r.At() + require.Equal(t, 0, len(l)) + require.Equal(t, []refdSample{{t: 1, v: 2}}, s) + + require.True(t, r.Next()) + l, s = r.At() + require.Equal(t, 0, len(l)) + require.Equal(t, []refdSample{{t: 99, v: 100}}, s) + + require.False(t, r.Next()) + require.Nil(t, r.Err()) + }) + } +}