refactor NewSegmentsRangeReader to take multi WAL ranges (#449)

* refactor NewSegmentsRangeReader to take multi WAL ranges

In case of an error when checkpointing the WAL the error doesn't show
the exact WAL index that is corrupter. this is because it uses
MultiReader to read multiply WAL files.
This refactoring allows the NewSegmentsRangeReader to take more than a
single WAL range and it reads all of the ranges by iterating each one.

this changes the logs from
create checkpoint: read segments: corruption after 4841144384 bytes:...
to
create checkpoint: read segments: corruption in segment
data/wal/00017351 at 123142208: ...

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2018-11-30 16:46:16 +02:00 committed by GitHub
parent 0493efb7c5
commit 48efdf8b81
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 54 additions and 55 deletions

5
CHANGELOG.md Normal file
View file

@ -0,0 +1,5 @@
## master / unreleased
- `LastCheckpoint` used to return just the segment name and now it returns the full relative path.
- `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct.
- `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors.

View file

@ -18,6 +18,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -59,7 +60,7 @@ func LastCheckpoint(dir string) (string, int, error) {
if err != nil { if err != nil {
continue continue
} }
return fi.Name(), idx, nil return filepath.Join(dir, fi.Name()), idx, nil
} }
return "", 0, ErrNotFound return "", 0, ErrNotFound
} }
@ -99,13 +100,11 @@ const checkpointPrefix = "checkpoint."
// it with the original WAL. // it with the original WAL.
func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
stats := &CheckpointStats{} stats := &CheckpointStats{}
var sgmReader io.ReadCloser
var sr io.Reader
// We close everything explicitly because Windows needs files to be
// closed before being deleted. But we also have defer so that we close
// files if there is an error somewhere.
var closers []io.Closer
{ {
var sgmRange []wal.SegmentRange
dir, idx, err := LastCheckpoint(w.Dir()) dir, idx, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound { if err != nil && err != ErrNotFound {
return nil, errors.Wrap(err, "find last checkpoint") return nil, errors.Wrap(err, "find last checkpoint")
@ -118,27 +117,15 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
// Ignore WAL files below the checkpoint. They shouldn't exist to begin with. // Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
from = last from = last
r, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), dir)) sgmRange = append(sgmRange, wal.SegmentRange{Dir: dir, Last: math.MaxInt32})
if err != nil {
return nil, errors.Wrap(err, "open last checkpoint")
}
defer r.Close()
closers = append(closers, r)
sr = r
} }
segsr, err := wal.NewSegmentsRangeReader(w.Dir(), from, to) sgmRange = append(sgmRange, wal.SegmentRange{Dir: w.Dir(), First: from, Last: to})
sgmReader, err = wal.NewSegmentsRangeReader(sgmRange...)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "create segment reader") return nil, errors.Wrap(err, "create segment reader")
} }
defer segsr.Close() defer sgmReader.Close()
closers = append(closers, segsr)
if sr != nil {
sr = io.MultiReader(sr, segsr)
} else {
sr = segsr
}
} }
cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to))
@ -152,7 +139,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
return nil, errors.Wrap(err, "open checkpoint") return nil, errors.Wrap(err, "open checkpoint")
} }
r := wal.NewReader(sr) r := wal.NewReader(sgmReader)
var ( var (
series []RefSeries series []RefSeries
@ -262,8 +249,6 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint directory") return nil, errors.Wrap(err, "rename checkpoint directory")
} }
if err := closeAll(closers...); err != nil {
return stats, errors.Wrap(err, "close opened files")
}
return stats, nil return stats, nil
} }

View file

@ -37,25 +37,25 @@ func TestLastCheckpoint(t *testing.T) {
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777)) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777))
s, k, err = LastCheckpoint(dir) s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.0000", s) testutil.Equals(t, filepath.Join(dir, "checkpoint.0000"), s)
testutil.Equals(t, 0, k) testutil.Equals(t, 0, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.xyz"), 0777)) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.xyz"), 0777))
s, k, err = LastCheckpoint(dir) s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.0000", s) testutil.Equals(t, filepath.Join(dir, "checkpoint.0000"), s)
testutil.Equals(t, 0, k) testutil.Equals(t, 0, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1"), 0777)) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1"), 0777))
s, k, err = LastCheckpoint(dir) s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.1", s) testutil.Equals(t, filepath.Join(dir, "checkpoint.1"), s)
testutil.Equals(t, 1, k) testutil.Equals(t, 1, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1000"), 0777)) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1000"), 0777))
s, k, err = LastCheckpoint(dir) s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.1000", s) testutil.Equals(t, filepath.Join(dir, "checkpoint.1000"), s)
testutil.Equals(t, 1000, k) testutil.Equals(t, 1000, k)
} }

View file

@ -15,7 +15,6 @@ package tsdb
import ( import (
"math" "math"
"path/filepath"
"runtime" "runtime"
"sort" "sort"
"strings" "strings"
@ -457,7 +456,7 @@ func (h *Head) Init() error {
return errors.Wrap(err, "find last checkpoint") return errors.Wrap(err, "find last checkpoint")
} }
if err == nil { if err == nil {
sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), dir)) sr, err := wal.NewSegmentsReader(dir)
if err != nil { if err != nil {
return errors.Wrap(err, "open checkpoint") return errors.Wrap(err, "open checkpoint")
} }
@ -472,7 +471,7 @@ func (h *Head) Init() error {
} }
// Backfill segments from the last checkpoint onwards // Backfill segments from the last checkpoint onwards
sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), startFrom, -1) sr, err := wal.NewSegmentsRangeReader(wal.SegmentRange{Dir: h.wal.Dir(), First: startFrom, Last: -1})
if err != nil { if err != nil {
return errors.Wrap(err, "open WAL segments") return errors.Wrap(err, "open WAL segments")
} }

View file

@ -20,7 +20,6 @@ import (
"fmt" "fmt"
"hash/crc32" "hash/crc32"
"io" "io"
"math"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
@ -83,6 +82,7 @@ func (s *Segment) Dir() string {
// CorruptionErr is an error that's returned when corruption is encountered. // CorruptionErr is an error that's returned when corruption is encountered.
type CorruptionErr struct { type CorruptionErr struct {
Dir string
Segment int Segment int
Offset int64 Offset int64
Err error Err error
@ -92,7 +92,7 @@ func (e *CorruptionErr) Error() string {
if e.Segment < 0 { if e.Segment < 0 {
return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err) return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err)
} }
return fmt.Sprintf("corruption in segment %d at %d: %s", e.Segment, e.Offset, e.Err) return fmt.Sprintf("corruption in segment %s at %d: %s", SegmentName(e.Dir, e.Segment), e.Offset, e.Err)
} }
// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. // OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
@ -635,32 +635,41 @@ func listSegments(dir string) (refs []segmentRef, err error) {
return refs, nil return refs, nil
} }
// NewSegmentsReader returns a new reader over all segments in the directory. // SegmentRange groups segments by the directory and the first and last index it includes.
func NewSegmentsReader(dir string) (io.ReadCloser, error) { type SegmentRange struct {
return NewSegmentsRangeReader(dir, 0, math.MaxInt32) Dir string
First, Last int
} }
// NewSegmentsRangeReader returns a new reader over the given WAL segment range. // NewSegmentsReader returns a new reader over all segments in the directory.
func NewSegmentsReader(dir string) (io.ReadCloser, error) {
return NewSegmentsRangeReader(SegmentRange{dir, -1, -1})
}
// NewSegmentsRangeReader returns a new reader over the given WAL segment ranges.
// If first or last are -1, the range is open on the respective end. // If first or last are -1, the range is open on the respective end.
func NewSegmentsRangeReader(dir string, first, last int) (io.ReadCloser, error) { func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) {
refs, err := listSegments(dir)
if err != nil {
return nil, err
}
var segs []*Segment var segs []*Segment
for _, r := range refs { for _, sgmRange := range sr {
if first >= 0 && r.index < first { refs, err := listSegments(sgmRange.Dir)
continue
}
if last >= 0 && r.index > last {
break
}
s, err := OpenReadSegment(filepath.Join(dir, r.name))
if err != nil { if err != nil {
return nil, err return nil, errors.Wrapf(err, "list segment in dir:%v", sgmRange.Dir)
}
for _, r := range refs {
if sgmRange.First >= 0 && r.index < sgmRange.First {
continue
}
if sgmRange.Last >= 0 && r.index > sgmRange.Last {
break
}
s, err := OpenReadSegment(filepath.Join(sgmRange.Dir, r.name))
if err != nil {
return nil, errors.Wrapf(err, "open segment:%v in dir:%v", r.name, sgmRange.Dir)
}
segs = append(segs, s)
} }
segs = append(segs, s)
} }
return newSegmentBufReader(segs...), nil return newSegmentBufReader(segs...), nil
} }
@ -856,6 +865,7 @@ func (r *Reader) Err() error {
if b, ok := r.rdr.(*segmentBufReader); ok { if b, ok := r.rdr.(*segmentBufReader); ok {
return &CorruptionErr{ return &CorruptionErr{
Err: r.err, Err: r.err,
Dir: b.segs[b.cur].Dir(),
Segment: b.segs[b.cur].Index(), Segment: b.segs[b.cur].Index(),
Offset: int64(b.off), Offset: int64(b.off),
} }

View file

@ -196,7 +196,7 @@ func TestWAL_FuzzWriteRead(t *testing.T) {
m, n, err := w.Segments() m, n, err := w.Segments()
testutil.Ok(t, err) testutil.Ok(t, err)
rc, err := NewSegmentsRangeReader(dir, m, n) rc, err := NewSegmentsRangeReader(SegmentRange{Dir: dir, First: m, Last: n})
testutil.Ok(t, err) testutil.Ok(t, err)
defer rc.Close() defer rc.Close()