Gracefully handle unknown WAL record types. (#8004)

As we're looking to expand what's in the WAL,
having old Prometheus servers ignore the new record types
rather than treating them as corruption allows for better
upgrade/downgrade paths.

Adjust some tests accordingly, so they're still testing what they're
meant to test.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2020-10-05 10:09:59 +01:00 committed by GitHub
parent d253251266
commit 073e93c768
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 35 additions and 36 deletions

View file

@ -510,12 +510,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
} }
decoded <- tstones decoded <- tstones
default: default:
decodeErr = &wal.CorruptionErr{ // Noop.
Err: errors.Errorf("invalid record type %v", dec.Type(rec)),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
} }
} }
}() }()

View file

@ -319,6 +319,13 @@ func TestHead_WALMultiRef(t *testing.T) {
}}, series) }}, series)
} }
func TestHead_UnknownWALRecord(t *testing.T) {
head, w := newTestHead(t, 1000, false)
w.Log([]byte{255, 42})
testutil.Ok(t, head.Init(0))
testutil.Ok(t, head.Close())
}
func TestHead_Truncate(t *testing.T) { func TestHead_Truncate(t *testing.T) {
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false)
defer func() { defer func() {
@ -1208,18 +1215,6 @@ func TestWalRepair_DecodingError(t *testing.T) {
totalRecs int totalRecs int
expRecs int expRecs int
}{ }{
"invalid_record": {
func(rec []byte) []byte {
// Do not modify the base record because it is Logged multiple times.
res := make([]byte, len(rec))
copy(res, rec)
res[0] = byte(record.Invalid)
return res
},
enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}),
9,
5,
},
"decode_series": { "decode_series": {
func(rec []byte) []byte { func(rec []byte) []byte {
return rec[:3] return rec[:3]

View file

@ -28,8 +28,8 @@ import (
type Type uint8 type Type uint8
const ( const (
// Invalid is returned for unrecognised WAL record types. // Unknown is returned for unrecognised WAL record types.
Invalid Type = 255 Unknown Type = 255
// Series is used to match WAL records of type Series. // Series is used to match WAL records of type Series.
Series Type = 1 Series Type = 1
// Samples is used to match WAL records of type Samples. // Samples is used to match WAL records of type Samples.
@ -62,16 +62,16 @@ type Decoder struct {
} }
// Type returns the type of the record. // Type returns the type of the record.
// Returns RecordInvalid if no valid record type is found. // Returns RecordUnknown if no valid record type is found.
func (d *Decoder) Type(rec []byte) Type { func (d *Decoder) Type(rec []byte) Type {
if len(rec) < 1 { if len(rec) < 1 {
return Invalid return Unknown
} }
switch t := Type(rec[0]); t { switch t := Type(rec[0]); t {
case Series, Samples, Tombstones: case Series, Samples, Tombstones:
return t return t
} }
return Invalid return Unknown
} }
// Series appends series in rec to the given slice. // Series appends series in rec to the given slice.

View file

@ -135,8 +135,8 @@ func TestRecord_Type(t *testing.T) {
testutil.Equals(t, Tombstones, recordType) testutil.Equals(t, Tombstones, recordType)
recordType = dec.Type(nil) recordType = dec.Type(nil)
testutil.Equals(t, Invalid, recordType) testutil.Equals(t, Unknown, recordType)
recordType = dec.Type([]byte{0}) recordType = dec.Type([]byte{0})
testutil.Equals(t, Invalid, recordType) testutil.Equals(t, Unknown, recordType)
} }

View file

@ -221,7 +221,8 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id uint64) bo
stats.DroppedTombstones += len(tstones) - len(repl) stats.DroppedTombstones += len(tstones) - len(repl)
default: default:
return nil, errors.New("invalid record type") // Unknown record type, probably from a future Prometheus version.
continue
} }
if len(buf[start:]) == 0 { if len(buf[start:]) == 0 {
continue // All contents discarded. continue // All contents discarded.

View file

@ -140,6 +140,8 @@ func TestCheckpoint(t *testing.T) {
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
}, nil)) }, nil))
testutil.Ok(t, err) testutil.Ok(t, err)
// Log an unknown record, that might have come from a future Prometheus version.
testutil.Ok(t, w.Log([]byte{255}))
testutil.Ok(t, w.Close()) testutil.Ok(t, w.Close())
// Start a WAL and write records to it as usual. // Start a WAL and write records to it as usual.
@ -225,7 +227,7 @@ func TestCheckpoint(t *testing.T) {
} }
func TestCheckpointNoTmpFolderAfterError(t *testing.T) { func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
// Create a new wal with an invalid records. // Create a new wal with invalid data.
dir, err := ioutil.TempDir("", "test_checkpoint") dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err) testutil.Ok(t, err)
defer func() { defer func() {
@ -233,10 +235,19 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
}() }()
w, err := NewSize(nil, nil, dir, 64*1024, false) w, err := NewSize(nil, nil, dir, 64*1024, false)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, w.Log([]byte{99})) var enc record.Encoder
w.Close() testutil.Ok(t, w.Log(enc.Series([]record.RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "2")}}, nil)))
testutil.Ok(t, w.Close())
// Run the checkpoint and since the wal contains an invalid records this should return an error. // Corrupt data.
f, err := os.OpenFile(filepath.Join(w.Dir(), "00000000"), os.O_WRONLY, 0666)
testutil.Ok(t, err)
_, err = f.WriteAt([]byte{42}, 1)
testutil.Ok(t, err)
testutil.Ok(t, f.Close())
// Run the checkpoint and since the wal contains corrupt data this should return an error.
_, err = Checkpoint(log.NewNopLogger(), w, 0, 1, nil, 0) _, err = Checkpoint(log.NewNopLogger(), w, 0, 1, nil, 0)
testutil.NotOk(t, err) testutil.NotOk(t, err)

View file

@ -507,13 +507,10 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
} }
case record.Tombstones: case record.Tombstones:
// noop
case record.Invalid:
return errors.New("invalid record")
default: default:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc() w.recordDecodeFailsMetric.Inc()
return errors.New("unknown TSDB record type")
} }
} }
return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err()) return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
@ -526,8 +523,6 @@ func (w *Watcher) SetStartTime(t time.Time) {
func recordType(rt record.Type) string { func recordType(rt record.Type) string {
switch rt { switch rt {
case record.Invalid:
return "invalid"
case record.Series: case record.Series:
return "series" return "series"
case record.Samples: case record.Samples:

View file

@ -278,6 +278,8 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
}, },
}, nil) }, nil)
testutil.Ok(t, w.Log(series)) testutil.Ok(t, w.Log(series))
// Add in an unknown record type, which should be ignored.
testutil.Ok(t, w.Log([]byte{255}))
for j := 0; j < samplesCount; j++ { for j := 0; j < samplesCount; j++ {
inner := rand.Intn(ref + 1) inner := rand.Intn(ref + 1)