diff --git a/db.go b/db.go index 1b508e066..e6a0a74b4 100644 --- a/db.go +++ b/db.go @@ -192,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err := repairBadIndexVersion(l, dir); err != nil { return nil, err } + // Migrate old WAL. + if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil { + return nil, errors.Wrap(err, "migrate WAL") + } db = &DB{ dir: dir, diff --git a/head.go b/head.go index 172d36e87..4f0c2c958 100644 --- a/head.go +++ b/head.go @@ -412,6 +412,8 @@ func (h *Head) Init() error { if err == nil { return nil } + level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) + if err := h.wal.Repair(err); err != nil { return errors.Wrap(err, "repair corrupted WAL") } diff --git a/repair_test.go b/repair_test.go index f4c9d2087..c80976002 100644 --- a/repair_test.go +++ b/repair_test.go @@ -76,7 +76,7 @@ func TestRepairBadIndexVersion(t *testing.T) { } // On DB opening all blocks in the base dir should be repaired. - db, _ := Open("testdata/repair_index_version", nil, nil, nil) + db, err := Open("testdata/repair_index_version", nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/wal.go b/wal.go index c1b9f6b06..26857a9dc 100644 --- a/wal.go +++ b/wal.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" ) // WALEntryType indicates what data a WAL entry contains. @@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. +// +// DEPRECATED: use wal pkg combined with the record codex instead. type WAL interface { Reader() WALReader LogSeries([]RefSeries) error @@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 { } // SegmentWAL is a write ahead log for series data. +// +// DEPRECATED: use wal pkg combined with the record coders instead. type SegmentWAL struct { mtx sync.Mutex metrics *walMetrics @@ -1206,3 +1211,100 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { } return nil } + +// MigrateWAL rewrites the deprecated write ahead log into the new format. +func MigrateWAL(logger log.Logger, dir string) (err error) { + if logger == nil { + logger = log.NewNopLogger() + } + // Detect whether we still have the old WAL. + fns, err := sequenceFiles(dir) + if err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "list sequence files") + } + if len(fns) == 0 { + return nil // No WAL at all yet. + } + // Check header of first segment to see whether we are still dealing with an + // old WAL. + f, err := os.Open(fns[0]) + if err != nil { + return errors.Wrap(err, "check first existing segment") + } + defer f.Close() + + var hdr [4]byte + if _, err := f.Read(hdr[:]); err != nil && err != io.EOF { + return errors.Wrap(err, "read header from first segment") + } + // If we cannot read the magic header for segments of the old WAL, abort. + // Either it's migrated already or there's a corruption issue with which + // we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case. + if binary.BigEndian.Uint32(hdr[:]) != WALMagic { + return nil + } + + level.Info(logger).Log("msg", "migrating WAL format") + + tmpdir := dir + ".tmp" + if err := os.RemoveAll(tmpdir); err != nil { + return errors.Wrap(err, "cleanup replacement dir") + } + repl, err := wal.New(logger, nil, tmpdir) + if err != nil { + return errors.Wrap(err, "open new WAL") + } + // It should've already been closed as part of the previous finalization. + // Do it once again in case of prior errors. + defer func() { + if err != nil { + repl.Close() + } + }() + + w, err := OpenSegmentWAL(dir, logger, time.Minute, nil) + if err != nil { + return errors.Wrap(err, "open old WAL") + } + defer w.Close() + + rdr := w.Reader() + + var ( + enc RecordEncoder + b []byte + ) + decErr := rdr.Read( + func(s []RefSeries) { + if err != nil { + return + } + err = repl.Log(enc.Series(s, b[:0])) + }, + func(s []RefSample) { + if err != nil { + return + } + err = repl.Log(enc.Samples(s, b[:0])) + }, + func(s []Stone) { + if err != nil { + return + } + err = repl.Log(enc.Tombstones(s, b[:0])) + }, + ) + if decErr != nil { + return errors.Wrap(err, "decode old entries") + } + if err != nil { + return errors.Wrap(err, "write new entries") + } + if err := repl.Close(); err != nil { + return errors.Wrap(err, "close new WAL") + } + if err := fileutil.Rename(tmpdir, dir); err != nil { + return errors.Wrap(err, "replace old WAL") + } + return nil +} diff --git a/wal_test.go b/wal_test.go index 6d559f5d9..b16680a99 100644 --- a/wal_test.go +++ b/wal_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "math/rand" "os" + "path" "testing" "time" @@ -26,6 +27,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func TestSegmentWAL_cut(t *testing.T) { @@ -431,3 +433,117 @@ func TestWALRestoreCorrupted(t *testing.T) { }) } } + +func TestMigrateWAL_Empty(t *testing.T) { + // The migration proecedure must properly deal with a zero-length segment, + // which is valid in the new format. + dir, err := ioutil.TempDir("", "walmigrate") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + + // Initialize empty WAL. + w, err := wal.New(nil, nil, wdir) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + testutil.Ok(t, MigrateWAL(nil, wdir)) +} + +func TestMigrateWAL_Fuzz(t *testing.T) { + dir, err := ioutil.TempDir("", "walmigrate") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + + // Should pass if no WAL exists yet. + testutil.Ok(t, MigrateWAL(nil, wdir)) + + oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil) + testutil.Ok(t, err) + + // Write some data. + testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, + {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, + })) + testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + {Ref: 1, T: 100, V: 200}, + {Ref: 2, T: 300, V: 400}, + })) + testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, + })) + testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + {Ref: 3, T: 100, V: 200}, + {Ref: 4, T: 300, V: 400}, + })) + testutil.Ok(t, oldWAL.LogDeletes([]Stone{ + {ref: 1, intervals: []Interval{{100, 200}}}, + })) + + testutil.Ok(t, oldWAL.Close()) + + // Perform migration. + testutil.Ok(t, MigrateWAL(nil, wdir)) + + w, err := wal.New(nil, nil, wdir) + testutil.Ok(t, err) + + // We can properly write some new data after migration. + var enc RecordEncoder + testutil.Ok(t, w.Log(enc.Samples([]RefSample{ + {Ref: 500, T: 1, V: 1}, + }, nil))) + + testutil.Ok(t, w.Close()) + + // Read back all data. + sr, err := wal.NewSegmentsReader(wdir) + testutil.Ok(t, err) + + r := wal.NewReader(sr) + var res []interface{} + var dec RecordDecoder + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + s, err := dec.Series(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + case RecordSamples: + s, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + case RecordTombstones: + s, err := dec.Tombstones(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + default: + t.Fatalf("unknown record type %d", dec.Type(rec)) + } + } + testutil.Ok(t, r.Err()) + + testutil.Equals(t, []interface{}{ + []RefSeries{ + {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, + {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, + }, + []RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}}, + []RefSeries{ + {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, + }, + []RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}}, + []Stone{{ref: 1, intervals: []Interval{{100, 200}}}}, + []RefSample{{Ref: 500, T: 1, V: 1}}, + }, res) + + // Migrating an already migrated WAL shouldn't do anything. + testutil.Ok(t, MigrateWAL(nil, wdir)) +}