Abstract WAL into interface

This commit is contained in:
Fabian Reinartz 2017-05-13 17:09:26 +02:00
parent 535532ca02
commit 4862b261d0
3 changed files with 52 additions and 40 deletions

View file

@ -51,7 +51,7 @@ var (
type HeadBlock struct { type HeadBlock struct {
mtx sync.RWMutex mtx sync.RWMutex
dir string dir string
wal *WAL wal WAL
activeWriters uint64 activeWriters uint64
closed bool closed bool
@ -101,7 +101,7 @@ func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*Head
// OpenHeadBlock opens the head block in dir. // OpenHeadBlock opens the head block in dir.
func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second) wal, err := OpenSegmentWAL(dir, log.With(l, "component", "wal"), 5*time.Second)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -122,7 +122,6 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
r := wal.Reader() r := wal.Reader()
Outer:
for r.Next() { for r.Next() {
series, samples := r.At() series, samples := r.At()
@ -132,8 +131,7 @@ Outer:
} }
for _, s := range samples { for _, s := range samples {
if int(s.Ref) >= len(h.series) { if int(s.Ref) >= len(h.series) {
l.Log("msg", "unknown series reference, abort WAL restore", "got", s.Ref, "max", len(h.series)-1) return nil, errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series))
break Outer
} }
h.series[s.Ref].append(s.T, s.V) h.series[s.Ref].append(s.T, s.V)

54
wal.go
View file

@ -49,9 +49,8 @@ const (
WALEntrySamples WALEntryType = 3 WALEntrySamples WALEntryType = 3
) )
// WAL is a write ahead log for series data. It can only be written to. // SegmentWAL is a write ahead log for series data.
// Use walReader to read back from a write ahead log. type SegmentWAL struct {
type WAL struct {
mtx sync.Mutex mtx sync.Mutex
dirFile *os.File dirFile *os.File
@ -69,6 +68,21 @@ type WAL struct {
donec chan struct{} donec chan struct{}
} }
// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
type WAL interface {
Reader() WALReader
Log([]labels.Labels, []RefSample) error
Close() error
}
// WALReader reads entries from a WAL.
type WALReader interface {
At() ([]labels.Labels, []RefSample)
Next() bool
Err() error
}
// RefSample is a timestamp/value pair associated with a reference to a series. // RefSample is a timestamp/value pair associated with a reference to a series.
type RefSample struct { type RefSample struct {
Ref uint64 Ref uint64
@ -90,9 +104,9 @@ func init() {
castagnoliTable = crc32.MakeTable(crc32.Castagnoli) castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
} }
// OpenWAL opens or creates a write ahead log in the given directory. // OpenSegmentWAL opens or creates a write ahead log in the given directory.
// The WAL must be read completely before new data is written. // The WAL must be read completely before new data is written.
func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) { func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
dir = filepath.Join(dir, walDirName) dir = filepath.Join(dir, walDirName)
if err := os.MkdirAll(dir, 0777); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
@ -106,7 +120,7 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL,
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
w := &WAL{ w := &SegmentWAL{
dirFile: df, dirFile: df,
logger: logger, logger: logger,
flushInterval: flushInterval, flushInterval: flushInterval,
@ -126,12 +140,12 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL,
// Reader returns a new reader over the the write ahead log data. // Reader returns a new reader over the the write ahead log data.
// It must be completely consumed before writing to the WAL. // It must be completely consumed before writing to the WAL.
func (w *WAL) Reader() WALReader { func (w *SegmentWAL) Reader() WALReader {
return newWALReader(w, w.logger) return newWALReader(w, w.logger)
} }
// Log writes a batch of new series labels and samples to the log. // Log writes a batch of new series labels and samples to the log.
func (w *WAL) Log(series []labels.Labels, samples []RefSample) error { func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error {
if err := w.encodeSeries(series); err != nil { if err := w.encodeSeries(series); err != nil {
return err return err
} }
@ -146,7 +160,7 @@ func (w *WAL) Log(series []labels.Labels, samples []RefSample) error {
// initSegments finds all existing segment files and opens them in the // initSegments finds all existing segment files and opens them in the
// appropriate file modes. // appropriate file modes.
func (w *WAL) initSegments() error { func (w *SegmentWAL) initSegments() error {
fns, err := sequenceFiles(w.dirFile.Name(), "") fns, err := sequenceFiles(w.dirFile.Name(), "")
if err != nil { if err != nil {
return err return err
@ -187,7 +201,7 @@ func (w *WAL) initSegments() error {
// cut finishes the currently active segments and opens the next one. // cut finishes the currently active segments and opens the next one.
// The encoder is reset to point to the new segment. // The encoder is reset to point to the new segment.
func (w *WAL) cut() error { func (w *SegmentWAL) cut() error {
// Sync current tail to disk and close. // Sync current tail to disk and close.
if tf := w.tail(); tf != nil { if tf := w.tail(); tf != nil {
if err := w.sync(); err != nil { if err := w.sync(); err != nil {
@ -236,7 +250,7 @@ func (w *WAL) cut() error {
return nil return nil
} }
func (w *WAL) tail() *os.File { func (w *SegmentWAL) tail() *os.File {
if len(w.files) == 0 { if len(w.files) == 0 {
return nil return nil
} }
@ -244,14 +258,14 @@ func (w *WAL) tail() *os.File {
} }
// Sync flushes the changes to disk. // Sync flushes the changes to disk.
func (w *WAL) Sync() error { func (w *SegmentWAL) Sync() error {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
return w.sync() return w.sync()
} }
func (w *WAL) sync() error { func (w *SegmentWAL) sync() error {
if w.cur == nil { if w.cur == nil {
return nil return nil
} }
@ -261,7 +275,7 @@ func (w *WAL) sync() error {
return fileutil.Fdatasync(w.tail()) return fileutil.Fdatasync(w.tail())
} }
func (w *WAL) run(interval time.Duration) { func (w *SegmentWAL) run(interval time.Duration) {
var tick <-chan time.Time var tick <-chan time.Time
if interval > 0 { if interval > 0 {
@ -284,7 +298,7 @@ func (w *WAL) run(interval time.Duration) {
} }
// Close syncs all data and closes the underlying resources. // Close syncs all data and closes the underlying resources.
func (w *WAL) Close() error { func (w *SegmentWAL) Close() error {
close(w.stopc) close(w.stopc)
<-w.donec <-w.donec
@ -312,7 +326,7 @@ const (
walPageBytes = 16 * minSectorSize walPageBytes = 16 * minSectorSize
) )
func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
@ -376,7 +390,7 @@ func putWALBuffer(b []byte) {
walBuffers.Put(b) walBuffers.Put(b)
} }
func (w *WAL) encodeSeries(series []labels.Labels) error { func (w *SegmentWAL) encodeSeries(series []labels.Labels) error {
if len(series) == 0 { if len(series) == 0 {
return nil return nil
} }
@ -402,7 +416,7 @@ func (w *WAL) encodeSeries(series []labels.Labels) error {
return w.entry(WALEntrySeries, walSeriesSimple, buf) return w.entry(WALEntrySeries, walSeriesSimple, buf)
} }
func (w *WAL) encodeSamples(samples []RefSample) error { func (w *SegmentWAL) encodeSamples(samples []RefSample) error {
if len(samples) == 0 { if len(samples) == 0 {
return nil return nil
} }
@ -439,7 +453,7 @@ func (w *WAL) encodeSamples(samples []RefSample) error {
type walReader struct { type walReader struct {
logger log.Logger logger log.Logger
wal *WAL wal *SegmentWAL
cur int cur int
buf []byte buf []byte
crc32 hash.Hash32 crc32 hash.Hash32
@ -449,7 +463,7 @@ type walReader struct {
samples []RefSample samples []RefSample
} }
func newWALReader(w *WAL, l log.Logger) *walReader { func newWALReader(w *SegmentWAL, l log.Logger) *walReader {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }

View file

@ -27,7 +27,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestWAL_initSegments(t *testing.T) { func TestSegmentWAL_initSegments(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_wal_open") tmpdir, err := ioutil.TempDir("", "test_wal_open")
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
@ -35,7 +35,7 @@ func TestWAL_initSegments(t *testing.T) {
df, err := fileutil.OpenDir(tmpdir) df, err := fileutil.OpenDir(tmpdir)
require.NoError(t, err) require.NoError(t, err)
w := &WAL{dirFile: df} w := &SegmentWAL{dirFile: df}
// Create segment files with an appropriate header. // Create segment files with an appropriate header.
for i := 1; i <= 5; i++ { for i := 1; i <= 5; i++ {
@ -80,7 +80,7 @@ func TestWAL_initSegments(t *testing.T) {
_, err = f.WriteAt([]byte{0}, 4) _, err = f.WriteAt([]byte{0}, 4)
require.NoError(t, err) require.NoError(t, err)
w = &WAL{dirFile: df} w = &SegmentWAL{dirFile: df}
require.Error(t, w.initSegments(), "init corrupted segments") require.Error(t, w.initSegments(), "init corrupted segments")
for _, f := range w.files { for _, f := range w.files {
@ -88,13 +88,13 @@ func TestWAL_initSegments(t *testing.T) {
} }
} }
func TestWAL_cut(t *testing.T) { func TestSegmentWAL_cut(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_wal_cut") tmpdir, err := ioutil.TempDir("", "test_wal_cut")
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
// This calls cut() implicitly the first time without a previous tail. // This calls cut() implicitly the first time without a previous tail.
w, err := OpenWAL(tmpdir, nil, 0) w, err := OpenSegmentWAL(tmpdir, nil, 0)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!"))) require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!")))
@ -131,7 +131,7 @@ func TestWAL_cut(t *testing.T) {
} }
// Symmetrical test of reading and writing to the WAL via its main interface. // Symmetrical test of reading and writing to the WAL via its main interface.
func TestWAL_Log_Restore(t *testing.T) { func TestSegmentWAL_Log_Restore(t *testing.T) {
const ( const (
numMetrics = 5000 numMetrics = 5000
iterations = 5 iterations = 5
@ -155,7 +155,7 @@ func TestWAL_Log_Restore(t *testing.T) {
// Open WAL a bunch of times, validate all previous data can be read, // Open WAL a bunch of times, validate all previous data can be read,
// write more data to it, close it. // write more data to it, close it.
for k := 0; k < numMetrics; k += numMetrics / iterations { for k := 0; k < numMetrics; k += numMetrics / iterations {
w, err := OpenWAL(dir, nil, 0) w, err := OpenSegmentWAL(dir, nil, 0)
require.NoError(t, err) require.NoError(t, err)
// Set smaller segment size so we can actually write several files. // Set smaller segment size so we can actually write several files.
@ -222,11 +222,11 @@ func TestWAL_Log_Restore(t *testing.T) {
func TestWALRestoreCorrupted(t *testing.T) { func TestWALRestoreCorrupted(t *testing.T) {
cases := []struct { cases := []struct {
name string name string
f func(*testing.T, *WAL) f func(*testing.T, *SegmentWAL)
}{ }{
{ {
name: "truncate_checksum", name: "truncate_checksum",
f: func(t *testing.T, w *WAL) { f: func(t *testing.T, w *SegmentWAL) {
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
require.NoError(t, err) require.NoError(t, err)
defer f.Close() defer f.Close()
@ -239,7 +239,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
}, },
{ {
name: "truncate_body", name: "truncate_body",
f: func(t *testing.T, w *WAL) { f: func(t *testing.T, w *SegmentWAL) {
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
require.NoError(t, err) require.NoError(t, err)
defer f.Close() defer f.Close()
@ -252,7 +252,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
}, },
{ {
name: "body_content", name: "body_content",
f: func(t *testing.T, w *WAL) { f: func(t *testing.T, w *SegmentWAL) {
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
require.NoError(t, err) require.NoError(t, err)
defer f.Close() defer f.Close()
@ -267,7 +267,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
}, },
{ {
name: "checksum", name: "checksum",
f: func(t *testing.T, w *WAL) { f: func(t *testing.T, w *SegmentWAL) {
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666)
require.NoError(t, err) require.NoError(t, err)
defer f.Close() defer f.Close()
@ -289,7 +289,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
w, err := OpenWAL(dir, nil, 0) w, err := OpenSegmentWAL(dir, nil, 0)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}})) require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}}))
@ -310,7 +310,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr) logger := log.NewLogfmtLogger(os.Stderr)
w2, err := OpenWAL(dir, logger, 0) w2, err := OpenSegmentWAL(dir, logger, 0)
require.NoError(t, err) require.NoError(t, err)
r := w2.Reader() r := w2.Reader()
@ -333,7 +333,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
// We should see the first valid entry and the new one, everything after // We should see the first valid entry and the new one, everything after
// is truncated. // is truncated.
w3, err := OpenWAL(dir, logger, 0) w3, err := OpenSegmentWAL(dir, logger, 0)
require.NoError(t, err) require.NoError(t, err)
r = w3.Reader() r = w3.Reader()