From e7edae39b2d15dea666eee372203c0063b50aabb Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 22 Dec 2016 16:14:34 +0100 Subject: [PATCH] Use page buffered writer for WAL --- wal.go | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/wal.go b/wal.go index 48a9920902..f037fe0edd 100644 --- a/wal.go +++ b/wal.go @@ -9,6 +9,7 @@ import ( "path/filepath" "github.com/coreos/etcd/pkg/fileutil" + "github.com/coreos/etcd/pkg/ioutil" "github.com/fabxc/tsdb/labels" "github.com/pkg/errors" ) @@ -55,10 +56,14 @@ func OpenWAL(dir string) (*WAL, error) { return nil, err } } + enc, err := newWALEncoder(f.File) + if err != nil { + return nil, err + } w := &WAL{ f: f, - enc: newWALEncoder(f), + enc: enc, symbols: map[string]uint32{}, } return w, nil @@ -97,6 +102,9 @@ func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error { } func (w *WAL) sync() error { + if err := w.enc.flush(); err != nil { + return err + } return fileutil.Fdatasync(w.f.File) } @@ -109,16 +117,30 @@ func (w *WAL) Close() error { } type walEncoder struct { - w io.Writer + w *ioutil.PageWriter buf []byte } -func newWALEncoder(w io.Writer) *walEncoder { - return &walEncoder{ - w: w, - buf: make([]byte, 0, 1024*1024), +// walPageBytes is the alignment for flushing records to the backing Writer. +// It should be a multiple of the minimum sector size so that WAL can safely +// distinguish between torn writes and ordinary data corruption. +const minSectorSize = 512 +const walPageBytes = 8 * minSectorSize + +func newWALEncoder(f *os.File) (*walEncoder, error) { + offset, err := f.Seek(0, os.SEEK_CUR) + if err != nil { + return nil, err } + return &walEncoder{ + w: ioutil.NewPageWriter(f, walPageBytes, int(offset)), + buf: make([]byte, 0, 1024*1024), + }, nil +} + +func (e *walEncoder) flush() error { + return e.w.Flush() } func (e *walEncoder) entry(et WALEntryType, flag byte, n int) error {