From 48efdf8b81cb32c91da8fad9e3b43b11a857a6b7 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 30 Nov 2018 16:46:16 +0200 Subject: [PATCH] refactor NewSegmentsRangeReader to take multi WAL ranges (#449) * refactor NewSegmentsRangeReader to take multi WAL ranges In case of an error when checkpointing the WAL the error doesn't show the exact WAL index that is corrupter. this is because it uses MultiReader to read multiply WAL files. This refactoring allows the NewSegmentsRangeReader to take more than a single WAL range and it reads all of the ranges by iterating each one. this changes the logs from create checkpoint: read segments: corruption after 4841144384 bytes:... to create checkpoint: read segments: corruption in segment data/wal/00017351 at 123142208: ... Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 5 +++++ checkpoint.go | 37 ++++++++++----------------------- checkpoint_test.go | 8 +++---- head.go | 5 ++--- wal/wal.go | 52 +++++++++++++++++++++++++++------------------- wal/wal_test.go | 2 +- 6 files changed, 54 insertions(+), 55 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 000000000..11f704b20 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,5 @@ +## master / unreleased + + - `LastCheckpoint` used to return just the segment name and now it returns the full relative path. + - `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct. + - `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors. diff --git a/checkpoint.go b/checkpoint.go index aa8170520..95bd41697 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -18,6 +18,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "os" "path/filepath" "strconv" @@ -59,7 +60,7 @@ func LastCheckpoint(dir string) (string, int, error) { if err != nil { continue } - return fi.Name(), idx, nil + return filepath.Join(dir, fi.Name()), idx, nil } return "", 0, ErrNotFound } @@ -99,13 +100,11 @@ const checkpointPrefix = "checkpoint." // it with the original WAL. func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} + var sgmReader io.ReadCloser - var sr io.Reader - // We close everything explicitly because Windows needs files to be - // closed before being deleted. But we also have defer so that we close - // files if there is an error somewhere. - var closers []io.Closer { + + var sgmRange []wal.SegmentRange dir, idx, err := LastCheckpoint(w.Dir()) if err != nil && err != ErrNotFound { return nil, errors.Wrap(err, "find last checkpoint") @@ -118,27 +117,15 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. from = last - r, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), dir)) - if err != nil { - return nil, errors.Wrap(err, "open last checkpoint") - } - defer r.Close() - closers = append(closers, r) - sr = r + sgmRange = append(sgmRange, wal.SegmentRange{Dir: dir, Last: math.MaxInt32}) } - segsr, err := wal.NewSegmentsRangeReader(w.Dir(), from, to) + sgmRange = append(sgmRange, wal.SegmentRange{Dir: w.Dir(), First: from, Last: to}) + sgmReader, err = wal.NewSegmentsRangeReader(sgmRange...) if err != nil { return nil, errors.Wrap(err, "create segment reader") } - defer segsr.Close() - closers = append(closers, segsr) - - if sr != nil { - sr = io.MultiReader(sr, segsr) - } else { - sr = segsr - } + defer sgmReader.Close() } cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) @@ -152,7 +139,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) return nil, errors.Wrap(err, "open checkpoint") } - r := wal.NewReader(sr) + r := wal.NewReader(sgmReader) var ( series []RefSeries @@ -262,8 +249,6 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { return nil, errors.Wrap(err, "rename checkpoint directory") } - if err := closeAll(closers...); err != nil { - return stats, errors.Wrap(err, "close opened files") - } + return stats, nil } diff --git a/checkpoint_test.go b/checkpoint_test.go index 60f99fd74..76c486a7d 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -37,25 +37,25 @@ func TestLastCheckpoint(t *testing.T) { testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777)) s, k, err = LastCheckpoint(dir) testutil.Ok(t, err) - testutil.Equals(t, "checkpoint.0000", s) + testutil.Equals(t, filepath.Join(dir, "checkpoint.0000"), s) testutil.Equals(t, 0, k) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.xyz"), 0777)) s, k, err = LastCheckpoint(dir) testutil.Ok(t, err) - testutil.Equals(t, "checkpoint.0000", s) + testutil.Equals(t, filepath.Join(dir, "checkpoint.0000"), s) testutil.Equals(t, 0, k) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1"), 0777)) s, k, err = LastCheckpoint(dir) testutil.Ok(t, err) - testutil.Equals(t, "checkpoint.1", s) + testutil.Equals(t, filepath.Join(dir, "checkpoint.1"), s) testutil.Equals(t, 1, k) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1000"), 0777)) s, k, err = LastCheckpoint(dir) testutil.Ok(t, err) - testutil.Equals(t, "checkpoint.1000", s) + testutil.Equals(t, filepath.Join(dir, "checkpoint.1000"), s) testutil.Equals(t, 1000, k) } diff --git a/head.go b/head.go index aa896d174..1988ac372 100644 --- a/head.go +++ b/head.go @@ -15,7 +15,6 @@ package tsdb import ( "math" - "path/filepath" "runtime" "sort" "strings" @@ -457,7 +456,7 @@ func (h *Head) Init() error { return errors.Wrap(err, "find last checkpoint") } if err == nil { - sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), dir)) + sr, err := wal.NewSegmentsReader(dir) if err != nil { return errors.Wrap(err, "open checkpoint") } @@ -472,7 +471,7 @@ func (h *Head) Init() error { } // Backfill segments from the last checkpoint onwards - sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), startFrom, -1) + sr, err := wal.NewSegmentsRangeReader(wal.SegmentRange{Dir: h.wal.Dir(), First: startFrom, Last: -1}) if err != nil { return errors.Wrap(err, "open WAL segments") } diff --git a/wal/wal.go b/wal/wal.go index 8f245d50b..2ed2018c7 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -20,7 +20,6 @@ import ( "fmt" "hash/crc32" "io" - "math" "os" "path/filepath" "sort" @@ -83,6 +82,7 @@ func (s *Segment) Dir() string { // CorruptionErr is an error that's returned when corruption is encountered. type CorruptionErr struct { + Dir string Segment int Offset int64 Err error @@ -92,7 +92,7 @@ func (e *CorruptionErr) Error() string { if e.Segment < 0 { return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err) } - return fmt.Sprintf("corruption in segment %d at %d: %s", e.Segment, e.Offset, e.Err) + return fmt.Sprintf("corruption in segment %s at %d: %s", SegmentName(e.Dir, e.Segment), e.Offset, e.Err) } // OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. @@ -635,32 +635,41 @@ func listSegments(dir string) (refs []segmentRef, err error) { return refs, nil } -// NewSegmentsReader returns a new reader over all segments in the directory. -func NewSegmentsReader(dir string) (io.ReadCloser, error) { - return NewSegmentsRangeReader(dir, 0, math.MaxInt32) +// SegmentRange groups segments by the directory and the first and last index it includes. +type SegmentRange struct { + Dir string + First, Last int } -// NewSegmentsRangeReader returns a new reader over the given WAL segment range. +// NewSegmentsReader returns a new reader over all segments in the directory. +func NewSegmentsReader(dir string) (io.ReadCloser, error) { + return NewSegmentsRangeReader(SegmentRange{dir, -1, -1}) +} + +// NewSegmentsRangeReader returns a new reader over the given WAL segment ranges. // If first or last are -1, the range is open on the respective end. -func NewSegmentsRangeReader(dir string, first, last int) (io.ReadCloser, error) { - refs, err := listSegments(dir) - if err != nil { - return nil, err - } +func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) { var segs []*Segment - for _, r := range refs { - if first >= 0 && r.index < first { - continue - } - if last >= 0 && r.index > last { - break - } - s, err := OpenReadSegment(filepath.Join(dir, r.name)) + for _, sgmRange := range sr { + refs, err := listSegments(sgmRange.Dir) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "list segment in dir:%v", sgmRange.Dir) + } + + for _, r := range refs { + if sgmRange.First >= 0 && r.index < sgmRange.First { + continue + } + if sgmRange.Last >= 0 && r.index > sgmRange.Last { + break + } + s, err := OpenReadSegment(filepath.Join(sgmRange.Dir, r.name)) + if err != nil { + return nil, errors.Wrapf(err, "open segment:%v in dir:%v", r.name, sgmRange.Dir) + } + segs = append(segs, s) } - segs = append(segs, s) } return newSegmentBufReader(segs...), nil } @@ -856,6 +865,7 @@ func (r *Reader) Err() error { if b, ok := r.rdr.(*segmentBufReader); ok { return &CorruptionErr{ Err: r.err, + Dir: b.segs[b.cur].Dir(), Segment: b.segs[b.cur].Index(), Offset: int64(b.off), } diff --git a/wal/wal_test.go b/wal/wal_test.go index 922126dcf..10600352d 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -196,7 +196,7 @@ func TestWAL_FuzzWriteRead(t *testing.T) { m, n, err := w.Segments() testutil.Ok(t, err) - rc, err := NewSegmentsRangeReader(dir, m, n) + rc, err := NewSegmentsRangeReader(SegmentRange{Dir: dir, First: m, Last: n}) testutil.Ok(t, err) defer rc.Close()