diff --git a/head.go b/head.go index 2adda313d..aa896d174 100644 --- a/head.go +++ b/head.go @@ -349,7 +349,11 @@ func (h *Head) loadWAL(r *wal.Reader) error { case RecordSeries: series, err = dec.Series(rec, series) if err != nil { - return errors.Wrap(err, "decode series") + return &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode series"), + Segment: r.Segment(), + Offset: r.Offset(), + } } for _, s := range series { h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) @@ -362,7 +366,11 @@ func (h *Head) loadWAL(r *wal.Reader) error { samples, err = dec.Samples(rec, samples) s := samples if err != nil { - return errors.Wrap(err, "decode samples") + return &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode samples"), + Segment: r.Segment(), + Offset: r.Offset(), + } } // We split up the samples into chunks of 5000 samples or less. // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise @@ -395,7 +403,11 @@ func (h *Head) loadWAL(r *wal.Reader) error { case RecordTombstones: tstones, err = dec.Tombstones(rec, tstones) if err != nil { - return errors.Wrap(err, "decode tombstones") + return &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode tombstones"), + Segment: r.Segment(), + Offset: r.Offset(), + } } for _, s := range tstones { for _, itv := range s.intervals { @@ -406,7 +418,11 @@ func (h *Head) loadWAL(r *wal.Reader) error { } } default: - return errors.Errorf("invalid record type %v", dec.Type(rec)) + return &wal.CorruptionErr{ + Err: errors.Errorf("invalid record type %v", dec.Type(rec)), + Segment: r.Segment(), + Offset: r.Offset(), + } } } if r.Err() != nil { diff --git a/head_test.go b/head_test.go index 7a68f4fce..93eccecdf 100644 --- a/head_test.go +++ b/head_test.go @@ -861,3 +861,82 @@ func TestHead_LogRollback(t *testing.T) { testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) } + +func TestWalRepair(t *testing.T) { + var enc RecordEncoder + for name, test := range map[string]struct { + corrFunc func(rec []byte) []byte // Func that applies the corruption to a record. + rec []byte + totalRecs int + expRecs int + }{ + "invalid_record": { + func(rec []byte) []byte { + rec[0] = byte(RecordInvalid) + return rec + }, + enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), + 9, + 5, + }, + "decode_series": { + func(rec []byte) []byte { + return rec[:3] + }, + enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), + 9, + 5, + }, + "decode_samples": { + func(rec []byte) []byte { + return rec[:3] + }, + enc.Samples([]RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}), + 9, + 5, + }, + "decode_tombstone": { + func(rec []byte) []byte { + return rec[:3] + }, + enc.Tombstones([]Stone{{ref: 1, intervals: Intervals{}}}, []byte{}), + 9, + 5, + }, + } { + t.Run(name, func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_head_repair") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) + + for i := 1; i <= test.totalRecs; i++ { + // At this point insert a corrupted record. + if i-1 == test.expRecs { + testutil.Ok(t, w.Log(test.corrFunc(test.rec))) + continue + } + testutil.Ok(t, w.Log(test.rec)) + } + + h, err := NewHead(nil, nil, w, 1) + testutil.Ok(t, err) + testutil.Ok(t, h.Init()) + + sr, err := wal.NewSegmentsReader(dir) + testutil.Ok(t, err) + defer sr.Close() + r := wal.NewReader(sr) + + var actRec int + for r.Next() { + actRec++ + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") + + }) + } +} diff --git a/wal/wal.go b/wal/wal.go index f1f13c738..8f245d50b 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -341,6 +341,10 @@ func (w *WAL) Repair(origErr error) error { r := NewReader(bufio.NewReader(f)) for r.Next() { + // Add records only up to the where the error was. + if r.Offset() >= cerr.Offset { + break + } if err := w.Log(r.Record()); err != nil { return errors.Wrap(err, "insert record") } @@ -869,6 +873,22 @@ func (r *Reader) Record() []byte { return r.rec } +// Segment returns the current segment being read. +func (r *Reader) Segment() int { + if b, ok := r.rdr.(*segmentBufReader); ok { + return b.segs[b.cur].Index() + } + return -1 +} + +// Offset returns the current position of the segment being read. +func (r *Reader) Offset() int64 { + if b, ok := r.rdr.(*segmentBufReader); ok { + return int64(b.off) + } + return r.total +} + func min(i, j int) int { if i < j { return i