diff --git a/head.go b/head.go index 6e3dd21c4c..6edbae8948 100644 --- a/head.go +++ b/head.go @@ -51,7 +51,7 @@ var ( type HeadBlock struct { mtx sync.RWMutex dir string - wal *WAL + wal WAL activeWriters uint64 closed bool @@ -101,7 +101,7 @@ func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*Head // OpenHeadBlock opens the head block in dir. func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { - wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second) + wal, err := OpenSegmentWAL(dir, log.With(l, "component", "wal"), 5*time.Second) if err != nil { return nil, err } @@ -122,7 +122,6 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { r := wal.Reader() -Outer: for r.Next() { series, samples := r.At() @@ -132,8 +131,7 @@ Outer: } for _, s := range samples { if int(s.Ref) >= len(h.series) { - l.Log("msg", "unknown series reference, abort WAL restore", "got", s.Ref, "max", len(h.series)-1) - break Outer + return nil, errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series)) } h.series[s.Ref].append(s.T, s.V) diff --git a/wal.go b/wal.go index 80f3508e8e..251944f0bf 100644 --- a/wal.go +++ b/wal.go @@ -49,9 +49,8 @@ const ( WALEntrySamples WALEntryType = 3 ) -// WAL is a write ahead log for series data. It can only be written to. -// Use walReader to read back from a write ahead log. -type WAL struct { +// SegmentWAL is a write ahead log for series data. +type SegmentWAL struct { mtx sync.Mutex dirFile *os.File @@ -69,6 +68,21 @@ type WAL struct { donec chan struct{} } +// WAL is a write ahead log that can log new series labels and samples. +// It must be completely read before new entries are logged. +type WAL interface { + Reader() WALReader + Log([]labels.Labels, []RefSample) error + Close() error +} + +// WALReader reads entries from a WAL. +type WALReader interface { + At() ([]labels.Labels, []RefSample) + Next() bool + Err() error +} + // RefSample is a timestamp/value pair associated with a reference to a series. type RefSample struct { Ref uint64 @@ -90,9 +104,9 @@ func init() { castagnoliTable = crc32.MakeTable(crc32.Castagnoli) } -// OpenWAL opens or creates a write ahead log in the given directory. +// OpenSegmentWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. -func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) { +func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) { dir = filepath.Join(dir, walDirName) if err := os.MkdirAll(dir, 0777); err != nil { @@ -106,7 +120,7 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, logger = log.NewNopLogger() } - w := &WAL{ + w := &SegmentWAL{ dirFile: df, logger: logger, flushInterval: flushInterval, @@ -126,12 +140,12 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. -func (w *WAL) Reader() WALReader { +func (w *SegmentWAL) Reader() WALReader { return newWALReader(w, w.logger) } // Log writes a batch of new series labels and samples to the log. -func (w *WAL) Log(series []labels.Labels, samples []RefSample) error { +func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error { if err := w.encodeSeries(series); err != nil { return err } @@ -146,7 +160,7 @@ func (w *WAL) Log(series []labels.Labels, samples []RefSample) error { // initSegments finds all existing segment files and opens them in the // appropriate file modes. -func (w *WAL) initSegments() error { +func (w *SegmentWAL) initSegments() error { fns, err := sequenceFiles(w.dirFile.Name(), "") if err != nil { return err @@ -187,7 +201,7 @@ func (w *WAL) initSegments() error { // cut finishes the currently active segments and opens the next one. // The encoder is reset to point to the new segment. -func (w *WAL) cut() error { +func (w *SegmentWAL) cut() error { // Sync current tail to disk and close. if tf := w.tail(); tf != nil { if err := w.sync(); err != nil { @@ -236,7 +250,7 @@ func (w *WAL) cut() error { return nil } -func (w *WAL) tail() *os.File { +func (w *SegmentWAL) tail() *os.File { if len(w.files) == 0 { return nil } @@ -244,14 +258,14 @@ func (w *WAL) tail() *os.File { } // Sync flushes the changes to disk. -func (w *WAL) Sync() error { +func (w *SegmentWAL) Sync() error { w.mtx.Lock() defer w.mtx.Unlock() return w.sync() } -func (w *WAL) sync() error { +func (w *SegmentWAL) sync() error { if w.cur == nil { return nil } @@ -261,7 +275,7 @@ func (w *WAL) sync() error { return fileutil.Fdatasync(w.tail()) } -func (w *WAL) run(interval time.Duration) { +func (w *SegmentWAL) run(interval time.Duration) { var tick <-chan time.Time if interval > 0 { @@ -284,7 +298,7 @@ func (w *WAL) run(interval time.Duration) { } // Close syncs all data and closes the underlying resources. -func (w *WAL) Close() error { +func (w *SegmentWAL) Close() error { close(w.stopc) <-w.donec @@ -312,7 +326,7 @@ const ( walPageBytes = 16 * minSectorSize ) -func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { +func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { w.mtx.Lock() defer w.mtx.Unlock() @@ -376,7 +390,7 @@ func putWALBuffer(b []byte) { walBuffers.Put(b) } -func (w *WAL) encodeSeries(series []labels.Labels) error { +func (w *SegmentWAL) encodeSeries(series []labels.Labels) error { if len(series) == 0 { return nil } @@ -402,7 +416,7 @@ func (w *WAL) encodeSeries(series []labels.Labels) error { return w.entry(WALEntrySeries, walSeriesSimple, buf) } -func (w *WAL) encodeSamples(samples []RefSample) error { +func (w *SegmentWAL) encodeSamples(samples []RefSample) error { if len(samples) == 0 { return nil } @@ -439,7 +453,7 @@ func (w *WAL) encodeSamples(samples []RefSample) error { type walReader struct { logger log.Logger - wal *WAL + wal *SegmentWAL cur int buf []byte crc32 hash.Hash32 @@ -449,7 +463,7 @@ type walReader struct { samples []RefSample } -func newWALReader(w *WAL, l log.Logger) *walReader { +func newWALReader(w *SegmentWAL, l log.Logger) *walReader { if l == nil { l = log.NewNopLogger() } diff --git a/wal_test.go b/wal_test.go index 99822ec01f..cbd6c62477 100644 --- a/wal_test.go +++ b/wal_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestWAL_initSegments(t *testing.T) { +func TestSegmentWAL_initSegments(t *testing.T) { tmpdir, err := ioutil.TempDir("", "test_wal_open") require.NoError(t, err) defer os.RemoveAll(tmpdir) @@ -35,7 +35,7 @@ func TestWAL_initSegments(t *testing.T) { df, err := fileutil.OpenDir(tmpdir) require.NoError(t, err) - w := &WAL{dirFile: df} + w := &SegmentWAL{dirFile: df} // Create segment files with an appropriate header. for i := 1; i <= 5; i++ { @@ -80,7 +80,7 @@ func TestWAL_initSegments(t *testing.T) { _, err = f.WriteAt([]byte{0}, 4) require.NoError(t, err) - w = &WAL{dirFile: df} + w = &SegmentWAL{dirFile: df} require.Error(t, w.initSegments(), "init corrupted segments") for _, f := range w.files { @@ -88,13 +88,13 @@ func TestWAL_initSegments(t *testing.T) { } } -func TestWAL_cut(t *testing.T) { +func TestSegmentWAL_cut(t *testing.T) { tmpdir, err := ioutil.TempDir("", "test_wal_cut") require.NoError(t, err) defer os.RemoveAll(tmpdir) // This calls cut() implicitly the first time without a previous tail. - w, err := OpenWAL(tmpdir, nil, 0) + w, err := OpenSegmentWAL(tmpdir, nil, 0) require.NoError(t, err) require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!"))) @@ -131,7 +131,7 @@ func TestWAL_cut(t *testing.T) { } // Symmetrical test of reading and writing to the WAL via its main interface. -func TestWAL_Log_Restore(t *testing.T) { +func TestSegmentWAL_Log_Restore(t *testing.T) { const ( numMetrics = 5000 iterations = 5 @@ -155,7 +155,7 @@ func TestWAL_Log_Restore(t *testing.T) { // Open WAL a bunch of times, validate all previous data can be read, // write more data to it, close it. for k := 0; k < numMetrics; k += numMetrics / iterations { - w, err := OpenWAL(dir, nil, 0) + w, err := OpenSegmentWAL(dir, nil, 0) require.NoError(t, err) // Set smaller segment size so we can actually write several files. @@ -222,11 +222,11 @@ func TestWAL_Log_Restore(t *testing.T) { func TestWALRestoreCorrupted(t *testing.T) { cases := []struct { name string - f func(*testing.T, *WAL) + f func(*testing.T, *SegmentWAL) }{ { name: "truncate_checksum", - f: func(t *testing.T, w *WAL) { + f: func(t *testing.T, w *SegmentWAL) { f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -239,7 +239,7 @@ func TestWALRestoreCorrupted(t *testing.T) { }, { name: "truncate_body", - f: func(t *testing.T, w *WAL) { + f: func(t *testing.T, w *SegmentWAL) { f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -252,7 +252,7 @@ func TestWALRestoreCorrupted(t *testing.T) { }, { name: "body_content", - f: func(t *testing.T, w *WAL) { + f: func(t *testing.T, w *SegmentWAL) { f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -267,7 +267,7 @@ func TestWALRestoreCorrupted(t *testing.T) { }, { name: "checksum", - f: func(t *testing.T, w *WAL) { + f: func(t *testing.T, w *SegmentWAL) { f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -289,7 +289,7 @@ func TestWALRestoreCorrupted(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - w, err := OpenWAL(dir, nil, 0) + w, err := OpenSegmentWAL(dir, nil, 0) require.NoError(t, err) require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}})) @@ -310,7 +310,7 @@ func TestWALRestoreCorrupted(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) - w2, err := OpenWAL(dir, logger, 0) + w2, err := OpenSegmentWAL(dir, logger, 0) require.NoError(t, err) r := w2.Reader() @@ -333,7 +333,7 @@ func TestWALRestoreCorrupted(t *testing.T) { // We should see the first valid entry and the new one, everything after // is truncated. - w3, err := OpenWAL(dir, logger, 0) + w3, err := OpenSegmentWAL(dir, logger, 0) require.NoError(t, err) r = w3.Reader()