repair wal when the record cannot be decoded (#453)

* repair wal when the record cannot be decoded

Currently repair is run only when the error happens in the reader.

A corruption can occur after the record is read and when it is decoded.
This change wraps the error at decoding as a CorruptionErr as this error
is expected to trigger a repair.

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2018-11-30 13:37:04 +02:00 committed by GitHub
parent 24520727a4
commit 0493efb7c5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 119 additions and 4 deletions

24
head.go
View file

@ -349,7 +349,11 @@ func (h *Head) loadWAL(r *wal.Reader) error {
case RecordSeries:
series, err = dec.Series(rec, series)
if err != nil {
return errors.Wrap(err, "decode series")
return &wal.CorruptionErr{
Err: errors.Wrap(err, "decode series"),
Segment: r.Segment(),
Offset: r.Offset(),
}
}
for _, s := range series {
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
@ -362,7 +366,11 @@ func (h *Head) loadWAL(r *wal.Reader) error {
samples, err = dec.Samples(rec, samples)
s := samples
if err != nil {
return errors.Wrap(err, "decode samples")
return &wal.CorruptionErr{
Err: errors.Wrap(err, "decode samples"),
Segment: r.Segment(),
Offset: r.Offset(),
}
}
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
@ -395,7 +403,11 @@ func (h *Head) loadWAL(r *wal.Reader) error {
case RecordTombstones:
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
return errors.Wrap(err, "decode tombstones")
return &wal.CorruptionErr{
Err: errors.Wrap(err, "decode tombstones"),
Segment: r.Segment(),
Offset: r.Offset(),
}
}
for _, s := range tstones {
for _, itv := range s.intervals {
@ -406,7 +418,11 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
}
default:
return errors.Errorf("invalid record type %v", dec.Type(rec))
return &wal.CorruptionErr{
Err: errors.Errorf("invalid record type %v", dec.Type(rec)),
Segment: r.Segment(),
Offset: r.Offset(),
}
}
}
if r.Err() != nil {

View file

@ -861,3 +861,82 @@ func TestHead_LogRollback(t *testing.T) {
testutil.Assert(t, ok, "expected series record but got %+v", recs[0])
testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series)
}
func TestWalRepair(t *testing.T) {
var enc RecordEncoder
for name, test := range map[string]struct {
corrFunc func(rec []byte) []byte // Func that applies the corruption to a record.
rec []byte
totalRecs int
expRecs int
}{
"invalid_record": {
func(rec []byte) []byte {
rec[0] = byte(RecordInvalid)
return rec
},
enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}),
9,
5,
},
"decode_series": {
func(rec []byte) []byte {
return rec[:3]
},
enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}),
9,
5,
},
"decode_samples": {
func(rec []byte) []byte {
return rec[:3]
},
enc.Samples([]RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}),
9,
5,
},
"decode_tombstone": {
func(rec []byte) []byte {
return rec[:3]
},
enc.Tombstones([]Stone{{ref: 1, intervals: Intervals{}}}, []byte{}),
9,
5,
},
} {
t.Run(name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_head_repair")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
for i := 1; i <= test.totalRecs; i++ {
// At this point insert a corrupted record.
if i-1 == test.expRecs {
testutil.Ok(t, w.Log(test.corrFunc(test.rec)))
continue
}
testutil.Ok(t, w.Log(test.rec))
}
h, err := NewHead(nil, nil, w, 1)
testutil.Ok(t, err)
testutil.Ok(t, h.Init())
sr, err := wal.NewSegmentsReader(dir)
testutil.Ok(t, err)
defer sr.Close()
r := wal.NewReader(sr)
var actRec int
for r.Next() {
actRec++
}
testutil.Ok(t, r.Err())
testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records")
})
}
}

View file

@ -341,6 +341,10 @@ func (w *WAL) Repair(origErr error) error {
r := NewReader(bufio.NewReader(f))
for r.Next() {
// Add records only up to the where the error was.
if r.Offset() >= cerr.Offset {
break
}
if err := w.Log(r.Record()); err != nil {
return errors.Wrap(err, "insert record")
}
@ -869,6 +873,22 @@ func (r *Reader) Record() []byte {
return r.rec
}
// Segment returns the current segment being read.
func (r *Reader) Segment() int {
if b, ok := r.rdr.(*segmentBufReader); ok {
return b.segs[b.cur].Index()
}
return -1
}
// Offset returns the current position of the segment being read.
func (r *Reader) Offset() int64 {
if b, ok := r.rdr.(*segmentBufReader); ok {
return int64(b.off)
}
return r.total
}
func min(i, j int) int {
if i < j {
return i