diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 000000000..11f704b20 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,5 @@ +## master / unreleased + + - `LastCheckpoint` used to return just the segment name and now it returns the full relative path. + - `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct. + - `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors. diff --git a/checkpoint.go b/checkpoint.go index aa8170520..95bd41697 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -18,6 +18,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "os" "path/filepath" "strconv" @@ -59,7 +60,7 @@ func LastCheckpoint(dir string) (string, int, error) { if err != nil { continue } - return fi.Name(), idx, nil + return filepath.Join(dir, fi.Name()), idx, nil } return "", 0, ErrNotFound } @@ -99,13 +100,11 @@ const checkpointPrefix = "checkpoint." // it with the original WAL. func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} + var sgmReader io.ReadCloser - var sr io.Reader - // We close everything explicitly because Windows needs files to be - // closed before being deleted. But we also have defer so that we close - // files if there is an error somewhere. - var closers []io.Closer { + + var sgmRange []wal.SegmentRange dir, idx, err := LastCheckpoint(w.Dir()) if err != nil && err != ErrNotFound { return nil, errors.Wrap(err, "find last checkpoint") @@ -118,27 +117,15 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. from = last - r, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), dir)) - if err != nil { - return nil, errors.Wrap(err, "open last checkpoint") - } - defer r.Close() - closers = append(closers, r) - sr = r + sgmRange = append(sgmRange, wal.SegmentRange{Dir: dir, Last: math.MaxInt32}) } - segsr, err := wal.NewSegmentsRangeReader(w.Dir(), from, to) + sgmRange = append(sgmRange, wal.SegmentRange{Dir: w.Dir(), First: from, Last: to}) + sgmReader, err = wal.NewSegmentsRangeReader(sgmRange...) if err != nil { return nil, errors.Wrap(err, "create segment reader") } - defer segsr.Close() - closers = append(closers, segsr) - - if sr != nil { - sr = io.MultiReader(sr, segsr) - } else { - sr = segsr - } + defer sgmReader.Close() } cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) @@ -152,7 +139,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) return nil, errors.Wrap(err, "open checkpoint") } - r := wal.NewReader(sr) + r := wal.NewReader(sgmReader) var ( series []RefSeries @@ -262,8 +249,6 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { return nil, errors.Wrap(err, "rename checkpoint directory") } - if err := closeAll(closers...); err != nil { - return stats, errors.Wrap(err, "close opened files") - } + return stats, nil } diff --git a/checkpoint_test.go b/checkpoint_test.go index 60f99fd74..76c486a7d 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -37,25 +37,25 @@ func TestLastCheckpoint(t *testing.T) { testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777)) s, k, err = LastCheckpoint(dir) testutil.Ok(t, err) - testutil.Equals(t, "checkpoint.0000", s) + testutil.Equals(t, filepath.Join(dir, "checkpoint.0000"), s) testutil.Equals(t, 0, k) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.xyz"), 0777)) s, k, err = LastCheckpoint(dir) testutil.Ok(t, err) - testutil.Equals(t, "checkpoint.0000", s) + testutil.Equals(t, filepath.Join(dir, "checkpoint.0000"), s) testutil.Equals(t, 0, k) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1"), 0777)) s, k, err = LastCheckpoint(dir) testutil.Ok(t, err) - testutil.Equals(t, "checkpoint.1", s) + testutil.Equals(t, filepath.Join(dir, "checkpoint.1"), s) testutil.Equals(t, 1, k) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1000"), 0777)) s, k, err = LastCheckpoint(dir) testutil.Ok(t, err) - testutil.Equals(t, "checkpoint.1000", s) + testutil.Equals(t, filepath.Join(dir, "checkpoint.1000"), s) testutil.Equals(t, 1000, k) } diff --git a/head.go b/head.go index aa896d174..1988ac372 100644 --- a/head.go +++ b/head.go @@ -15,7 +15,6 @@ package tsdb import ( "math" - "path/filepath" "runtime" "sort" "strings" @@ -457,7 +456,7 @@ func (h *Head) Init() error { return errors.Wrap(err, "find last checkpoint") } if err == nil { - sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), dir)) + sr, err := wal.NewSegmentsReader(dir) if err != nil { return errors.Wrap(err, "open checkpoint") } @@ -472,7 +471,7 @@ func (h *Head) Init() error { } // Backfill segments from the last checkpoint onwards - sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), startFrom, -1) + sr, err := wal.NewSegmentsRangeReader(wal.SegmentRange{Dir: h.wal.Dir(), First: startFrom, Last: -1}) if err != nil { return errors.Wrap(err, "open WAL segments") } diff --git a/wal/wal.go b/wal/wal.go index 8f245d50b..2ed2018c7 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -20,7 +20,6 @@ import ( "fmt" "hash/crc32" "io" - "math" "os" "path/filepath" "sort" @@ -83,6 +82,7 @@ func (s *Segment) Dir() string { // CorruptionErr is an error that's returned when corruption is encountered. type CorruptionErr struct { + Dir string Segment int Offset int64 Err error @@ -92,7 +92,7 @@ func (e *CorruptionErr) Error() string { if e.Segment < 0 { return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err) } - return fmt.Sprintf("corruption in segment %d at %d: %s", e.Segment, e.Offset, e.Err) + return fmt.Sprintf("corruption in segment %s at %d: %s", SegmentName(e.Dir, e.Segment), e.Offset, e.Err) } // OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. @@ -635,32 +635,41 @@ func listSegments(dir string) (refs []segmentRef, err error) { return refs, nil } -// NewSegmentsReader returns a new reader over all segments in the directory. -func NewSegmentsReader(dir string) (io.ReadCloser, error) { - return NewSegmentsRangeReader(dir, 0, math.MaxInt32) +// SegmentRange groups segments by the directory and the first and last index it includes. +type SegmentRange struct { + Dir string + First, Last int } -// NewSegmentsRangeReader returns a new reader over the given WAL segment range. +// NewSegmentsReader returns a new reader over all segments in the directory. +func NewSegmentsReader(dir string) (io.ReadCloser, error) { + return NewSegmentsRangeReader(SegmentRange{dir, -1, -1}) +} + +// NewSegmentsRangeReader returns a new reader over the given WAL segment ranges. // If first or last are -1, the range is open on the respective end. -func NewSegmentsRangeReader(dir string, first, last int) (io.ReadCloser, error) { - refs, err := listSegments(dir) - if err != nil { - return nil, err - } +func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) { var segs []*Segment - for _, r := range refs { - if first >= 0 && r.index < first { - continue - } - if last >= 0 && r.index > last { - break - } - s, err := OpenReadSegment(filepath.Join(dir, r.name)) + for _, sgmRange := range sr { + refs, err := listSegments(sgmRange.Dir) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "list segment in dir:%v", sgmRange.Dir) + } + + for _, r := range refs { + if sgmRange.First >= 0 && r.index < sgmRange.First { + continue + } + if sgmRange.Last >= 0 && r.index > sgmRange.Last { + break + } + s, err := OpenReadSegment(filepath.Join(sgmRange.Dir, r.name)) + if err != nil { + return nil, errors.Wrapf(err, "open segment:%v in dir:%v", r.name, sgmRange.Dir) + } + segs = append(segs, s) } - segs = append(segs, s) } return newSegmentBufReader(segs...), nil } @@ -856,6 +865,7 @@ func (r *Reader) Err() error { if b, ok := r.rdr.(*segmentBufReader); ok { return &CorruptionErr{ Err: r.err, + Dir: b.segs[b.cur].Dir(), Segment: b.segs[b.cur].Index(), Offset: int64(b.off), } diff --git a/wal/wal_test.go b/wal/wal_test.go index 922126dcf..10600352d 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -196,7 +196,7 @@ func TestWAL_FuzzWriteRead(t *testing.T) { m, n, err := w.Segments() testutil.Ok(t, err) - rc, err := NewSegmentsRangeReader(dir, m, n) + rc, err := NewSegmentsRangeReader(SegmentRange{Dir: dir, First: m, Last: n}) testutil.Ok(t, err) defer rc.Close()