From 073e93c76865ff39abccccd392d2111c56a3dc6d Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Mon, 5 Oct 2020 10:09:59 +0100 Subject: [PATCH] Gracefully handle unknown WAL record types. (#8004) As we're looking to expand what's in the WAL, having old Prometheus servers ignore the new record types rather than treating them as corruption allows for better upgrade/downgrade paths. Adjust some tests accordingly, so they're still testing what they're meant to test. Signed-off-by: Brian Brazil --- tsdb/head.go | 7 +------ tsdb/head_test.go | 19 +++++++------------ tsdb/record/record.go | 10 +++++----- tsdb/record/record_test.go | 4 ++-- tsdb/wal/checkpoint.go | 3 ++- tsdb/wal/checkpoint_test.go | 19 +++++++++++++++---- tsdb/wal/watcher.go | 7 +------ tsdb/wal/watcher_test.go | 2 ++ 8 files changed, 35 insertions(+), 36 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index ba5cf1a61..759d983b1 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -510,12 +510,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks } decoded <- tstones default: - decodeErr = &wal.CorruptionErr{ - Err: errors.Errorf("invalid record type %v", dec.Type(rec)), - Segment: r.Segment(), - Offset: r.Offset(), - } - return + // Noop. } } }() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 1803f267f..75e20add9 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -319,6 +319,13 @@ func TestHead_WALMultiRef(t *testing.T) { }}, series) } +func TestHead_UnknownWALRecord(t *testing.T) { + head, w := newTestHead(t, 1000, false) + w.Log([]byte{255, 42}) + testutil.Ok(t, head.Init(0)) + testutil.Ok(t, head.Close()) +} + func TestHead_Truncate(t *testing.T) { h, _ := newTestHead(t, 1000, false) defer func() { @@ -1208,18 +1215,6 @@ func TestWalRepair_DecodingError(t *testing.T) { totalRecs int expRecs int }{ - "invalid_record": { - func(rec []byte) []byte { - // Do not modify the base record because it is Logged multiple times. - res := make([]byte, len(rec)) - copy(res, rec) - res[0] = byte(record.Invalid) - return res - }, - enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), - 9, - 5, - }, "decode_series": { func(rec []byte) []byte { return rec[:3] diff --git a/tsdb/record/record.go b/tsdb/record/record.go index d63198f97..e7df28766 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -28,8 +28,8 @@ import ( type Type uint8 const ( - // Invalid is returned for unrecognised WAL record types. - Invalid Type = 255 + // Unknown is returned for unrecognised WAL record types. + Unknown Type = 255 // Series is used to match WAL records of type Series. Series Type = 1 // Samples is used to match WAL records of type Samples. @@ -62,16 +62,16 @@ type Decoder struct { } // Type returns the type of the record. -// Returns RecordInvalid if no valid record type is found. +// Returns RecordUnknown if no valid record type is found. func (d *Decoder) Type(rec []byte) Type { if len(rec) < 1 { - return Invalid + return Unknown } switch t := Type(rec[0]); t { case Series, Samples, Tombstones: return t } - return Invalid + return Unknown } // Series appends series in rec to the given slice. diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index de2ce30cf..7d56ae208 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -135,8 +135,8 @@ func TestRecord_Type(t *testing.T) { testutil.Equals(t, Tombstones, recordType) recordType = dec.Type(nil) - testutil.Equals(t, Invalid, recordType) + testutil.Equals(t, Unknown, recordType) recordType = dec.Type([]byte{0}) - testutil.Equals(t, Invalid, recordType) + testutil.Equals(t, Unknown, recordType) } diff --git a/tsdb/wal/checkpoint.go b/tsdb/wal/checkpoint.go index 0b5a4747b..33e2e58ce 100644 --- a/tsdb/wal/checkpoint.go +++ b/tsdb/wal/checkpoint.go @@ -221,7 +221,8 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id uint64) bo stats.DroppedTombstones += len(tstones) - len(repl) default: - return nil, errors.New("invalid record type") + // Unknown record type, probably from a future Prometheus version. + continue } if len(buf[start:]) == 0 { continue // All contents discarded. diff --git a/tsdb/wal/checkpoint_test.go b/tsdb/wal/checkpoint_test.go index d9a223889..d32713bd9 100644 --- a/tsdb/wal/checkpoint_test.go +++ b/tsdb/wal/checkpoint_test.go @@ -140,6 +140,8 @@ func TestCheckpoint(t *testing.T) { {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, }, nil)) testutil.Ok(t, err) + // Log an unknown record, that might have come from a future Prometheus version. + testutil.Ok(t, w.Log([]byte{255})) testutil.Ok(t, w.Close()) // Start a WAL and write records to it as usual. @@ -225,7 +227,7 @@ func TestCheckpoint(t *testing.T) { } func TestCheckpointNoTmpFolderAfterError(t *testing.T) { - // Create a new wal with an invalid records. + // Create a new wal with invalid data. dir, err := ioutil.TempDir("", "test_checkpoint") testutil.Ok(t, err) defer func() { @@ -233,10 +235,19 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { }() w, err := NewSize(nil, nil, dir, 64*1024, false) testutil.Ok(t, err) - testutil.Ok(t, w.Log([]byte{99})) - w.Close() + var enc record.Encoder + testutil.Ok(t, w.Log(enc.Series([]record.RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "2")}}, nil))) + testutil.Ok(t, w.Close()) - // Run the checkpoint and since the wal contains an invalid records this should return an error. + // Corrupt data. + f, err := os.OpenFile(filepath.Join(w.Dir(), "00000000"), os.O_WRONLY, 0666) + testutil.Ok(t, err) + _, err = f.WriteAt([]byte{42}, 1) + testutil.Ok(t, err) + testutil.Ok(t, f.Close()) + + // Run the checkpoint and since the wal contains corrupt data this should return an error. _, err = Checkpoint(log.NewNopLogger(), w, 0, 1, nil, 0) testutil.NotOk(t, err) diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index 27a2739b7..1fb78005f 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -507,13 +507,10 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } case record.Tombstones: - // noop - case record.Invalid: - return errors.New("invalid record") default: + // Could be corruption, or reading from a WAL from a newer Prometheus. w.recordDecodeFailsMetric.Inc() - return errors.New("unknown TSDB record type") } } return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err()) @@ -526,8 +523,6 @@ func (w *Watcher) SetStartTime(t time.Time) { func recordType(rt record.Type) string { switch rt { - case record.Invalid: - return "invalid" case record.Series: return "series" case record.Samples: diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index b2b26ba4c..6893ad72a 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -278,6 +278,8 @@ func TestReadToEndWithCheckpoint(t *testing.T) { }, }, nil) testutil.Ok(t, w.Log(series)) + // Add in an unknown record type, which should be ignored. + testutil.Ok(t, w.Log([]byte{255})) for j := 0; j < samplesCount; j++ { inner := rand.Intn(ref + 1)