mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Migrate write ahead log
On startup, rewrite the old write ahead log into the new format once. Signed-off-by: Fabian Reinartz <freinartz@google.com>
This commit is contained in:
parent
3e76f0163e
commit
1a5573b4ce
4
db.go
4
db.go
|
@ -192,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
if err := repairBadIndexVersion(l, dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Migrate old WAL.
|
||||
if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil {
|
||||
return nil, errors.Wrap(err, "migrate WAL")
|
||||
}
|
||||
|
||||
db = &DB{
|
||||
dir: dir,
|
||||
|
|
2
head.go
2
head.go
|
@ -392,6 +392,8 @@ func (h *Head) Init() error {
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)
|
||||
|
||||
if err := h.wal.Repair(err); err != nil {
|
||||
return errors.Wrap(err, "repair corrupted WAL")
|
||||
}
|
||||
|
|
88
wal.go
88
wal.go
|
@ -33,6 +33,7 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
"github.com/prometheus/tsdb/wal"
|
||||
)
|
||||
|
||||
// WALEntryType indicates what data a WAL entry contains.
|
||||
|
@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
|
|||
|
||||
// WAL is a write ahead log that can log new series labels and samples.
|
||||
// It must be completely read before new entries are logged.
|
||||
//
|
||||
// DEPRECATED: use wal pkg combined with the record coders instead.
|
||||
type WAL interface {
|
||||
Reader() WALReader
|
||||
LogSeries([]RefSeries) error
|
||||
|
@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 {
|
|||
}
|
||||
|
||||
// SegmentWAL is a write ahead log for series data.
|
||||
//
|
||||
// DEPRECATED: use wal pkg combined with the record coders instead.
|
||||
type SegmentWAL struct {
|
||||
mtx sync.Mutex
|
||||
metrics *walMetrics
|
||||
|
@ -1206,3 +1211,86 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MigrateWAL rewrites the deprecated write ahead log into the new format.
|
||||
func MigrateWAL(logger log.Logger, dir string) error {
|
||||
// Detect whether we still have the old WAL.
|
||||
fns, err := sequenceFiles(dir)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return errors.Wrap(err, "list sequence files")
|
||||
}
|
||||
if len(fns) == 0 {
|
||||
return nil // No WAL at all yet.
|
||||
}
|
||||
// Check header of first segment.
|
||||
f, err := os.Open(fns[0])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "check first existing segment")
|
||||
}
|
||||
var hdr [4]byte
|
||||
if n, err := f.Read(hdr[:]); err != nil {
|
||||
return errors.Wrap(err, "read header from first segment")
|
||||
} else if n != 4 {
|
||||
return errors.New("could not read full header from segment")
|
||||
}
|
||||
if binary.BigEndian.Uint32(hdr[:]) != WALMagic {
|
||||
return nil // Not the old WAL anymore.
|
||||
}
|
||||
|
||||
level.Info(logger).Log("msg", "migrating WAL format")
|
||||
|
||||
tmpdir := dir + ".tmp"
|
||||
if err := os.RemoveAll(tmpdir); err != nil {
|
||||
return errors.Wrap(err, "cleanup replacement dir")
|
||||
}
|
||||
repl, err := wal.New(logger, nil, tmpdir)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open new WAL")
|
||||
}
|
||||
w, err := OpenSegmentWAL(dir, logger, time.Minute, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open old WAL")
|
||||
}
|
||||
rdr := w.Reader()
|
||||
|
||||
var (
|
||||
enc RecordEncoder
|
||||
b []byte
|
||||
)
|
||||
decErr := rdr.Read(
|
||||
func(s []RefSeries) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = repl.Log(enc.Series(s, b[:0]))
|
||||
},
|
||||
func(s []RefSample) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = repl.Log(enc.Samples(s, b[:0]))
|
||||
},
|
||||
func(s []Stone) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = repl.Log(enc.Tombstones(s, b[:0]))
|
||||
},
|
||||
)
|
||||
if decErr != nil {
|
||||
return errors.Wrap(err, "decode old entries")
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "write new entries")
|
||||
}
|
||||
if err := w.Close(); err != nil {
|
||||
return errors.Wrap(err, "close old WAL")
|
||||
}
|
||||
if err := repl.Close(); err != nil {
|
||||
return errors.Wrap(err, "close new WAL")
|
||||
}
|
||||
if err := fileutil.Rename(tmpdir, dir); err != nil {
|
||||
return errors.Wrap(err, "replace old WAL")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
99
wal_test.go
99
wal_test.go
|
@ -19,6 +19,7 @@ import (
|
|||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -26,6 +27,7 @@ import (
|
|||
"github.com/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
"github.com/prometheus/tsdb/testutil"
|
||||
"github.com/prometheus/tsdb/wal"
|
||||
)
|
||||
|
||||
func TestSegmentWAL_cut(t *testing.T) {
|
||||
|
@ -431,3 +433,100 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateWAL_Fuzz(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "walmigrate")
|
||||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
wdir := path.Join(dir, "wal")
|
||||
|
||||
// Should pass if no WAL exists yet.
|
||||
testutil.Ok(t, MigrateWAL(nil, wdir))
|
||||
|
||||
oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
// Write some data.
|
||||
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
|
||||
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
|
||||
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
|
||||
}))
|
||||
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
|
||||
{Ref: 1, T: 100, V: 200},
|
||||
{Ref: 2, T: 300, V: 400},
|
||||
}))
|
||||
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
|
||||
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
|
||||
}))
|
||||
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
|
||||
{Ref: 3, T: 100, V: 200},
|
||||
{Ref: 4, T: 300, V: 400},
|
||||
}))
|
||||
testutil.Ok(t, oldWAL.LogDeletes([]Stone{
|
||||
{ref: 1, intervals: []Interval{{100, 200}}},
|
||||
}))
|
||||
|
||||
testutil.Ok(t, oldWAL.Close())
|
||||
|
||||
// Perform migration.
|
||||
testutil.Ok(t, MigrateWAL(nil, wdir))
|
||||
|
||||
w, err := wal.New(nil, nil, wdir)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
// We can properly write some new data after migration.
|
||||
var enc RecordEncoder
|
||||
testutil.Ok(t, w.Log(enc.Samples([]RefSample{
|
||||
{Ref: 500, T: 1, V: 1},
|
||||
}, nil)))
|
||||
|
||||
testutil.Ok(t, w.Close())
|
||||
|
||||
// Read back all data.
|
||||
sr, err := wal.NewSegmentsReader(wdir)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
r := wal.NewReader(sr)
|
||||
var res []interface{}
|
||||
var dec RecordDecoder
|
||||
|
||||
for r.Next() {
|
||||
rec := r.Record()
|
||||
|
||||
switch dec.Type(rec) {
|
||||
case RecordSeries:
|
||||
s, err := dec.Series(rec, nil)
|
||||
testutil.Ok(t, err)
|
||||
res = append(res, s)
|
||||
case RecordSamples:
|
||||
s, err := dec.Samples(rec, nil)
|
||||
testutil.Ok(t, err)
|
||||
res = append(res, s)
|
||||
case RecordTombstones:
|
||||
s, err := dec.Tombstones(rec, nil)
|
||||
testutil.Ok(t, err)
|
||||
res = append(res, s)
|
||||
default:
|
||||
t.Fatalf("unknown record type %d", dec.Type(rec))
|
||||
}
|
||||
}
|
||||
testutil.Ok(t, r.Err())
|
||||
|
||||
testutil.Equals(t, []interface{}{
|
||||
[]RefSeries{
|
||||
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
|
||||
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
|
||||
},
|
||||
[]RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}},
|
||||
[]RefSeries{
|
||||
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
|
||||
},
|
||||
[]RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}},
|
||||
[]Stone{{ref: 1, intervals: []Interval{{100, 200}}}},
|
||||
[]RefSample{{Ref: 500, T: 1, V: 1}},
|
||||
}, res)
|
||||
|
||||
// Migrating an already migrated WAL shouldn't do anything.
|
||||
testutil.Ok(t, MigrateWAL(nil, wdir))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue