From 449a2d0db7e3efd32aa748ca31ecc1044496ccd2 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 17 May 2018 09:00:32 -0400 Subject: [PATCH] wal: add segment type and repair procedure Allow to repair the WAL based on the error returned by a reader during a full scan over all records. Signed-off-by: Fabian Reinartz --- wal/wal.go | 364 +++++++++++++++++++++++++++++++++++++----------- wal/wal_test.go | 98 ++++++++++++- 2 files changed, 378 insertions(+), 84 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index ed51d1ad90..ed979998c5 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -20,6 +20,7 @@ import ( "fmt" "hash/crc32" "io" + "math" "os" "path/filepath" "strconv" @@ -60,19 +61,101 @@ func (p *page) full() bool { return pageSize-p.alloc < recordHeaderSize } +// Segment represents a segment file. +type Segment struct { + *os.File + dir string + i int +} + +// Index returns the index of the segment. +func (s *Segment) Index() int { + return s.i +} + +// Dir returns the directory of the segment. +func (s *Segment) Dir() string { + return s.dir +} + +// CorruptionErr is an error that's returned when corruption is encountered. +type CorruptionErr struct { + Segment int + Offset int + Err error +} + +func (e *CorruptionErr) Error() string { + if e.Segment < 0 { + return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err) + } + return fmt.Sprintf("corruption in segment %d at %d: %s", e.Segment, e.Offset, e.Err) +} + +// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. +func OpenWriteSegment(dir string, k int) (*Segment, error) { + // Only .active segments are allowed to be opened for write. + f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + stat, err := f.Stat() + if err != nil { + f.Close() + return nil, err + } + // If the last page is torn, fill it with zeros. + // In case it was torn after all records were written successfully, this + // will just pad the page and everything will be fine. + // If it was torn mid-record, a full read (which the caller should do anyway + // to ensure integrity) will detect it as a corruption by the end. + if d := stat.Size() % pageSize; d != 0 { + if _, err := f.Write(make([]byte, pageSize-d)); err != nil { + f.Close() + return nil, errors.Wrap(err, "zero-pad torn page") + } + } + return &Segment{File: f, i: k, dir: dir}, nil +} + +// CreateSegment creates a new segment k in dir. +func CreateSegment(dir string, k int) (*Segment, error) { + f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + return &Segment{File: f, i: k, dir: dir}, nil +} + +// OpenReadSegment opens the segment k in dir for reading. +func OpenReadSegment(fn string) (*Segment, error) { + k, err := strconv.Atoi(filepath.Base(fn)) + if err != nil { + return nil, errors.New("not a valid filename") + } + f, err := os.Open(fn) + if err != nil { + return nil, err + } + return &Segment{File: f, i: k, dir: filepath.Dir(fn)}, nil +} + // WAL is a write ahead log that stores records in segment files. +// It must be read from start to end once before logging new data. +// If an errore occurs during read, the repair procedure must be called +// before it's safe to do further writes. +// // Segments are written to in pages of 32KB, with records possibly split // across page boundaries. // Records are never split across segments to allow full segments to be -// safely truncated. -// Segments are terminated by one full zero page to allow tailing readers -// to detect segment boundaries. +// safely truncated. It also ensures that torn writes never corrupt records +// beyond the most recent segment. type WAL struct { dir string logger log.Logger segmentSize int mtx sync.RWMutex - segment *os.File // active segment + segment *Segment // active segment donePages int // pages written to the segment page *page // active page stopc chan chan struct{} @@ -85,10 +168,12 @@ type WAL struct { // New returns a new WAL over the given directory. func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { - return newWAL(logger, reg, dir, defaultSegmentSize) + return NewSize(logger, reg, dir, defaultSegmentSize) } -func newWAL(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { +// NewSize returns a new WAL over the given directory. +// New segments are created with the specified size. +func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { if segmentSize%pageSize != 0 { return nil, errors.New("invalid segment size") } @@ -124,16 +209,23 @@ func newWAL(logger log.Logger, reg prometheus.Registerer, dir string, segmentSiz _, j, err := w.Segments() if err != nil { - return nil, err + return nil, errors.Wrap(err, "get segment range") } // Fresh dir, no segments yet. if j == -1 { - w.segment, err = os.OpenFile(SegmentName(w.dir, 0), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if w.segment, err = CreateSegment(w.dir, 0); err != nil { + return nil, err + } } else { - w.segment, err = os.OpenFile(SegmentName(w.dir, j), os.O_WRONLY|os.O_APPEND, 0666) - } - if err != nil { - return nil, err + if w.segment, err = OpenWriteSegment(w.dir, j); err != nil { + return nil, err + } + // Correctly initialize donePages. + stat, err := w.segment.Stat() + if err != nil { + return nil, err + } + w.donePages = int(stat.Size() / pageSize) } go w.run() @@ -146,23 +238,99 @@ func (w *WAL) Dir() string { } func (w *WAL) run() { +Loop: for { - // Processing all pending functions has precedence over shutdown. - select { - case f := <-w.actorc: - f() - default: - } select { case f := <-w.actorc: f() case donec := <-w.stopc: - close(donec) + defer close(donec) + break Loop + } + } + // Drain and process any remaining functions. + for { + select { + case f := <-w.actorc: + f() + default: return } } } +// Repair attempts to repair the WAL based on the error. +// It discards all data behind the corruption +func (w *WAL) Repair(err error) error { + // We could probably have a mode that only discards torn records right around + // the corruption to preserve as data much as possible. + // But that's not generally applicable if the records have any kind of causality. + // Maybe as an extra mode in the future if mid-WAL corruptions become + // a frequent concern. + cerr, ok := err.(*CorruptionErr) + if !ok { + return errors.New("cannot handle error") + } + if cerr.Segment < 0 { + return errors.New("corruption error does not specify position") + } + + level.Warn(w.logger).Log("msg", "starting corruption repair", + "segment", cerr.Segment, "offset", cerr.Offset) + + // All segments behind the corruption can no longer be used. + segs, err := listSegments(w.dir) + if err != nil { + return errors.Wrap(err, "list segments") + } + level.Warn(w.logger).Log("msg", "deleting all segments behind corruption") + + for _, s := range segs { + if s.n <= cerr.Segment { + continue + } + if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { + return errors.Wrap(err, "delete segment") + } + } + // Regardless of the corruption offset, no record reaches into the previous segment. + // So we can safely repair the WAL by removing the segment and re-inserting all + // its records up to the corruption. + level.Warn(w.logger).Log("msg", "rewrite corrupted segment") + + fn := SegmentName(w.dir, cerr.Segment) + tmpfn := fn + ".repair" + + if err := fileutil.Rename(fn, tmpfn); err != nil { + return err + } + // Create a clean segment and make it the active one. + s, err := CreateSegment(w.dir, cerr.Segment) + if err != nil { + return err + } + w.segment = s + + f, err := os.Open(tmpfn) + if err != nil { + return errors.Wrap(err, "open segment") + } + defer f.Close() + r := NewReader(bufio.NewReader(f)) + + for r.Next() { + if err := w.Log(r.Record()); err != nil { + return errors.Wrap(err, "insert record") + } + } + // We expect an error here, so nothing to handle. + + if err := os.Remove(tmpfn); err != nil { + return errors.Wrap(err, "delete corrupted segment") + } + return nil +} + // SegmentName builds a segment name for the directory. func SegmentName(dir string, i int) string { return filepath.Join(dir, fmt.Sprintf("%06d", i)) @@ -170,15 +338,13 @@ func SegmentName(dir string, i int) string { // nextSegment creates the next segment and closes the previous one. func (w *WAL) nextSegment() error { - if err := w.flushPage(true); err != nil { - return err + // Only flush the current page if it actually holds data. + if w.page.alloc > 0 { + if err := w.flushPage(true); err != nil { + return err + } } - k, err := strconv.Atoi(filepath.Base(w.segment.Name())) - if err != nil { - return errors.Errorf("current segment %q not numerical", w.segment.Name()) - } - // TODO(fabxc): write initialization page with meta info? - next, err := os.OpenFile(SegmentName(w.dir, k+1), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + next, err := CreateSegment(w.dir, w.segment.Index()+1) if err != nil { return errors.Wrap(err, "create new segment file") } @@ -186,8 +352,7 @@ func (w *WAL) nextSegment() error { w.segment = next w.donePages = 0 - // Don't block further writes by handling the last segment. - // TODO(fabxc): write a termination page as a marker to detect torn segments? + // Don't block further writes by fsyncing the last segment. w.actorc <- func() { if err := w.fsync(prev); err != nil { level.Error(w.logger).Log("msg", "sync previous segment", "err", err) @@ -366,9 +531,9 @@ func (w *WAL) Truncate(i int) error { return nil } -func (w *WAL) fsync(f *os.File) error { +func (w *WAL) fsync(f *Segment) error { start := time.Now() - err := fileutil.Fsync(f) + err := fileutil.Fsync(f.File) w.fsyncDuration.Observe(time.Since(start).Seconds()) return err } @@ -426,65 +591,65 @@ func listSegments(dir string) (refs []segmentRef, err error) { return refs, nil } -type multiReadCloser struct { - io.Reader - files []*os.File -} - // NewSegmentsReader returns a new reader over all segments in the directory. func NewSegmentsReader(dir string) (io.ReadCloser, error) { - refs, err := listSegments(dir) - if err != nil { - return nil, err - } - var rdrs []io.Reader - var files []*os.File - - for _, r := range refs { - f, err := os.Open(filepath.Join(dir, r.s)) - if err != nil { - return nil, err - } - rdrs = append(rdrs, f) - files = append(files, f) - } - return &multiReadCloser{ - Reader: io.MultiReader(rdrs...), - files: files, - }, nil + return NewSegmentsRangeReader(dir, 0, math.MaxInt64) } // NewSegmentsRangeReader returns a new reader over the given WAL segment range. +// If m or n are -1, the range is open on the respective end. func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { refs, err := listSegments(dir) if err != nil { return nil, err } - var rdrs []io.Reader - var files []*os.File + var segs []*Segment for _, r := range refs { - if r.n < m { + if m >= 0 && r.n < m { continue } - if r.n > n { + if n >= 0 && r.n > n { break } - f, err := os.Open(filepath.Join(dir, r.s)) + s, err := OpenReadSegment(filepath.Join(dir, r.s)) if err != nil { return nil, err } - rdrs = append(rdrs, f) - files = append(files, f) + segs = append(segs, s) } - return &multiReadCloser{ - Reader: io.MultiReader(rdrs...), - files: files, - }, nil + return newSegmentBufReader(segs...), nil } -func (r *multiReadCloser) Close() (err error) { - for _, s := range r.files { +// Reader reads WAL records from an io.Reader. +type Reader struct { + rdr io.Reader + err error + rec []byte + total int // total bytes processed. +} + +// segmentBufReader is a buffered reader that reads in multiples of pages. +// The main purpose is that we are able to track segment and offset for +// corruption reporting. +type segmentBufReader struct { + buf *bufio.Reader + segs []*Segment + cur int + off int + more bool +} + +func newSegmentBufReader(segs ...*Segment) *segmentBufReader { + return &segmentBufReader{ + buf: bufio.NewReaderSize(nil, 16*pageSize), + segs: segs, + cur: -1, + } +} + +func (r *segmentBufReader) Close() (err error) { + for _, s := range r.segs { if e := s.Close(); e != nil { err = e } @@ -492,24 +657,42 @@ func (r *multiReadCloser) Close() (err error) { return err } -// Reader reads WAL records from an io.Reader. -type Reader struct { - rdr *bufio.Reader - err error - rec []byte - total int // total bytes processed. +func (r *segmentBufReader) Read(b []byte) (n int, err error) { + if !r.more { + if r.cur+1 >= len(r.segs) { + return 0, io.EOF + } + r.cur++ + r.off = 0 + r.more = true + r.buf.Reset(r.segs[r.cur]) + } + n, err = r.buf.Read(b) + r.off += n + if err != io.EOF { + return n, err + } + // Just return what we read so far, but don't signal EOF. + // Only unset more so we don't invalidate the current segment and + // offset before the next read. + r.more = false + // If no more segments are left, it's the end for the reader. + if len(r.segs) == 0 { + return n, io.EOF + } + return n, nil } // NewReader returns a new reader. func NewReader(r io.Reader) *Reader { - return &Reader{rdr: bufio.NewReader(r)} + return &Reader{rdr: r} } // Next advances the reader to the next records and returns true if it exists. // It must not be called once after it returned false. func (r *Reader) Next() bool { err := r.next() - if err == io.EOF { + if errors.Cause(err) == io.EOF { return false } r.err = err @@ -523,9 +706,8 @@ func (r *Reader) next() (err error) { i := 0 for { - hdr[0], err = r.rdr.ReadByte() - if err != nil { - return err + if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil { + return errors.Wrap(err, "read first header byte") } r.total++ typ := recType(hdr[0]) @@ -541,7 +723,7 @@ func (r *Reader) next() (err error) { } n, err := io.ReadFull(r.rdr, buf[:k]) if err != nil { - return err + return errors.Wrap(err, "read remaining zeros") } r.total += n @@ -554,7 +736,7 @@ func (r *Reader) next() (err error) { } n, err := io.ReadFull(r.rdr, hdr[1:]) if err != nil { - return err + return errors.Wrap(err, "read remaining header") } r.total += n @@ -608,9 +790,25 @@ func (r *Reader) next() (err error) { } } -// Err returns the last encountered error. +// Err returns the last encountered error wrapped in a corruption error. +// If the reader does not allow to infer a segment index and offset, a total +// offset in the reader stream will be provided. func (r *Reader) Err() error { - return r.err + if r.err == nil { + return nil + } + if b, ok := r.rdr.(*segmentBufReader); ok { + return &CorruptionErr{ + Err: r.err, + Segment: b.segs[b.cur].Index(), + Offset: b.off, + } + } + return &CorruptionErr{ + Err: r.err, + Segment: -1, + Offset: r.total, + } } // Record returns the current record. The returned byte slice is only diff --git a/wal/wal_test.go b/wal/wal_test.go index 913283d1c8..d1b724c7ed 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -161,7 +161,7 @@ func TestWAL_FuzzWriteRead(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - w, err := newWAL(nil, nil, dir, 128*pageSize) + w, err := NewSize(nil, nil, dir, 128*pageSize) testutil.Ok(t, err) var input [][]byte @@ -214,6 +214,102 @@ func TestWAL_FuzzWriteRead(t *testing.T) { testutil.Ok(t, rdr.Err()) } +func TestWAL_Repair(t *testing.T) { + for name, cf := range map[string]func(f *os.File){ + "bad_fragment_sequence": func(f *os.File) { + _, err := f.Seek(pageSize, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{byte(recLast)}) + testutil.Ok(t, err) + }, + "bad_fragment_flag": func(f *os.File) { + _, err := f.Seek(pageSize, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{123}) + testutil.Ok(t, err) + }, + "bad_checksum": func(f *os.File) { + _, err := f.Seek(pageSize+4, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + }, + "bad_length": func(f *os.File) { + _, err := f.Seek(pageSize+2, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + }, + "bad_content": func(f *os.File) { + _, err := f.Seek(pageSize+100, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte("beef")) + testutil.Ok(t, err) + }, + } { + t.Run(name, func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_repair") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + // We create 3 segments with 3 records each and then corrupt the 2nd record + // of the 2nd segment. + // As a result we want a repaired WAL with the first 4 records intact. + w, err := NewSize(nil, nil, dir, 3*pageSize) + testutil.Ok(t, err) + + var records [][]byte + + for i := 1; i <= 9; i++ { + b := make([]byte, pageSize-recordHeaderSize) + b[0] = byte(i) + records = append(records, b) + testutil.Ok(t, w.Log(b)) + } + testutil.Ok(t, w.Close()) + + f, err := os.OpenFile(SegmentName(dir, 1), os.O_RDWR, 0666) + testutil.Ok(t, err) + + // Apply corruption function. + cf(f) + + testutil.Ok(t, f.Close()) + + w, err = New(nil, nil, dir) + testutil.Ok(t, err) + + sr, err := NewSegmentsReader(dir) + testutil.Ok(t, err) + r := NewReader(sr) + + for r.Next() { + } + testutil.NotOk(t, r.Err()) + + testutil.Ok(t, w.Repair(r.Err())) + + sr, err = NewSegmentsReader(dir) + testutil.Ok(t, err) + r = NewReader(sr) + + var result [][]byte + for r.Next() { + var b []byte + result = append(result, append(b, r.Record()...)) + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, 4, len(result)) + + for i, r := range result { + if !bytes.Equal(records[i], r) { + t.Fatalf("record %d diverges: want %x, got %x", i, records[i][:10], r[:10]) + } + } + }) + } +} + func BenchmarkWAL_LogBatched(b *testing.B) { dir, err := ioutil.TempDir("", "bench_logbatch") testutil.Ok(b, err)