Add full encode/decode WAL cycle test

This commit is contained in:
Fabian Reinartz 2017-02-14 21:54:59 -08:00
parent 2c97428a79
commit 9c7a88223e
5 changed files with 211 additions and 123 deletions

25
head.go
View file

@ -98,25 +98,26 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
meta: *meta, meta: *meta,
} }
// Replay contents of the write ahead log. r := wal.Reader()
if err = wal.ReadAll(&walHandler{
series: func(lset labels.Labels) error { for r.Next() {
series, samples := r.At()
for _, lset := range series {
h.create(lset.Hash(), lset) h.create(lset.Hash(), lset)
h.meta.Stats.NumSeries++ h.meta.Stats.NumSeries++
return nil }
}, for _, s := range samples {
sample: func(s refdSample) error {
h.series[s.ref].append(s.t, s.v) h.series[s.ref].append(s.t, s.v)
if !h.inBounds(s.t) { if !h.inBounds(s.t) {
return ErrOutOfBounds return nil, errors.Wrap(ErrOutOfBounds, "consume WAL")
} }
h.meta.Stats.NumSamples++ h.meta.Stats.NumSamples++
return nil }
}, }
}); err != nil { if err := r.Err(); err != nil {
return nil, err return nil, errors.Wrap(err, "consume WAL")
} }
h.updateMapping() h.updateMapping()

View file

@ -1,7 +1,6 @@
package tsdb package tsdb
import ( import (
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"sort" "sort"
@ -43,11 +42,7 @@ func TestPositionMapper(t *testing.T) {
} }
func BenchmarkCreateSeries(b *testing.B) { func BenchmarkCreateSeries(b *testing.B) {
f, err := os.Open("cmd/tsdb/testdata.1m") lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6)
require.NoError(b, err)
defer f.Close()
lbls, err := readPrometheusLabels(f, 1e6)
require.NoError(b, err) require.NoError(b, err)
b.Run("", func(b *testing.B) { b.Run("", func(b *testing.B) {
@ -67,8 +62,14 @@ func BenchmarkCreateSeries(b *testing.B) {
}) })
} }
func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) {
b, err := ioutil.ReadAll(r) f, err := os.Open(fn)
if err != nil {
return nil, err
}
defer f.Close()
b, err := ioutil.ReadAll(f)
if err != nil { if err != nil {
return nil, err return nil, err
} }

237
wal.go
View file

@ -83,46 +83,20 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error
if err := w.initSegments(); err != nil { if err := w.initSegments(); err != nil {
return nil, err return nil, err
} }
// If there are no existing segments yet, create the initial one.
if len(w.files) == 0 {
if err := w.cut(); err != nil {
return nil, err
}
}
go w.run(flushInterval) go w.run(flushInterval)
return w, nil return w, nil
} }
type walHandler struct { // Reader returns a new reader over the the write ahead log data.
sample func(refdSample) error // It must be completely consumed before writing to the WAL.
series func(labels.Labels) error func (w *WAL) Reader() *WALReader {
} var rs []io.ReadCloser
for _, f := range w.files {
// ReadAll consumes all entries in the WAL and triggers the registered handlers. rs = append(rs, f)
func (w *WAL) ReadAll(h *walHandler) error {
for i, f := range w.files {
dec := newWALDecoder(f, h)
for {
if err := dec.next(); err != nil {
if err == io.EOF {
// If file end was reached, move on to the next segment.
break
}
return err
}
}
// Close completed file after we are done reading it.
if i < len(w.files)-1 {
if err := f.Close(); err != nil {
return err
}
}
} }
return nil return &WALReader{rs: rs}
} }
// Log writes a batch of new series labels and samples to the log. // Log writes a batch of new series labels and samples to the log.
@ -194,6 +168,13 @@ func (w *WAL) cut() error {
if err := w.sync(); err != nil { if err := w.sync(); err != nil {
return err return err
} }
off, err := tf.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if err := tf.Truncate(off); err != nil {
return err
}
if err := tf.Close(); err != nil { if err := tf.Close(); err != nil {
return err return err
} }
@ -245,6 +226,9 @@ func (w *WAL) Sync() error {
} }
func (w *WAL) sync() error { func (w *WAL) sync() error {
if w.cur == nil {
return nil
}
if err := w.cur.Flush(); err != nil { if err := w.cur.Flush(); err != nil {
return err return err
} }
@ -286,7 +270,10 @@ func (w *WAL) Close() error {
} }
// On opening, a WAL must be fully consumed once. Afterwards // On opening, a WAL must be fully consumed once. Afterwards
// only the current segment will still be open. // only the current segment will still be open.
return w.tail().Close() if tf := w.tail(); tf != nil {
return tf.Close()
}
return nil
} }
const ( const (
@ -302,16 +289,20 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
sz := int64(6 + 4 + len(buf)) // Cut to the next segment if exceeds the file size unless it would also
// exceed the size of a new segment.
if w.curN+sz > w.segmentSize { var (
sz = int64(6 + 4 + len(buf))
newsz = w.curN + sz
)
if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize {
if err := w.cut(); err != nil { if err := w.cut(); err != nil {
return err return err
} }
} }
h := crc32.NewIEEE() h := crc32.NewIEEE()
wr := io.MultiWriter(h, w.cur, os.Stdout) wr := io.MultiWriter(h, w.cur)
b := make([]byte, 6) b := make([]byte, 6)
b[0] = byte(et) b[0] = byte(et)
@ -414,38 +405,122 @@ func (w *WAL) encodeSamples(samples []refdSample) error {
return w.entry(WALEntrySamples, walSamplesSimple, buf) return w.entry(WALEntrySamples, walSamplesSimple, buf)
} }
type walDecoder struct { // WALReader decodes and emits write ahead log entries.
r io.Reader type WALReader struct {
handler *walHandler rs []io.ReadCloser
cur int
buf []byte buf []byte
err error
labels []labels.Labels
samples []refdSample
} }
// newWALDecoder returns a new decoder for the default WAL format. The meta // NewWALReader returns a new WALReader over the sequence of the given ReadClosers.
// headers of a segment must already have been consumed. func NewWALReader(rs ...io.ReadCloser) *WALReader {
func newWALDecoder(r io.Reader, h *walHandler) *walDecoder { return &WALReader{
return &walDecoder{ rs: rs,
r: r, buf: make([]byte, 0, 1024*1024),
handler: h,
buf: make([]byte, 0, 1024*1024),
} }
} }
func (d *walDecoder) next() error { // At returns the last decoded entry of labels or samples.
t, flag, b, err := d.entry() func (r *WALReader) At() ([]labels.Labels, []refdSample) {
return r.labels, r.samples
}
// Err returns the last error the reader encountered.
func (r *WALReader) Err() error {
return r.err
}
// nextEntry retrieves the next entry. It is also used as a testing hook.
func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) {
if r.cur >= len(r.rs) {
return 0, 0, nil, io.EOF
}
cr := r.rs[r.cur]
et, flag, b, err := r.entry(cr)
if err == io.EOF {
// Current reader completed, close and move to the next one.
if err := cr.Close(); err != nil {
return 0, 0, nil, err
}
r.cur++
return r.nextEntry()
}
return et, flag, b, err
}
// Next returns decodes the next entry pair and returns true
// if it was succesful.
func (r *WALReader) Next() bool {
r.labels = r.labels[:0]
r.samples = r.samples[:0]
et, flag, b, err := r.nextEntry()
if err != nil { if err != nil {
return err if err != io.EOF {
r.err = err
}
return false
} }
switch t {
switch et {
case WALEntrySamples: case WALEntrySamples:
return d.decodeSamples(flag, b) if err := r.decodeSamples(flag, b); err != nil {
r.err = err
}
case WALEntrySeries: case WALEntrySeries:
return d.decodeSamples(flag, b) if err := r.decodeSeries(flag, b); err != nil {
r.err = err
}
default:
r.err = errors.Errorf("unknown WAL entry type %d", et)
} }
return errors.Errorf("unknown WAL entry type %q", t) return r.err == nil
} }
func (d *walDecoder) decodeSeries(flag byte, b []byte) error { func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
cw := crc32.NewIEEE()
tr := io.TeeReader(cr, cw)
b := make([]byte, 6)
if _, err := tr.Read(b); err != nil {
return 0, 0, nil, err
}
var (
etype = WALEntryType(b[0])
flag = b[1]
length = int(binary.BigEndian.Uint32(b[2:]))
)
// Exit if we reached pre-allocated space.
if etype == 0 {
return 0, 0, nil, io.EOF
}
if length > len(r.buf) {
r.buf = make([]byte, length)
}
buf := r.buf[:length]
if _, err := tr.Read(buf); err != nil {
return 0, 0, nil, err
}
_, err := cr.Read(b[:4])
if err != nil {
return 0, 0, nil, err
}
if exp, has := binary.BigEndian.Uint32(b[:4]), cw.Sum32(); has != exp {
return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp)
}
return etype, flag, buf, nil
}
func (r *WALReader) decodeSeries(flag byte, b []byte) error {
for len(b) > 0 { for len(b) > 0 {
l, n := binary.Uvarint(b) l, n := binary.Uvarint(b)
if n < 1 { if n < 1 {
@ -470,14 +545,12 @@ func (d *walDecoder) decodeSeries(flag byte, b []byte) error {
b = b[n+int(vl):] b = b[n+int(vl):]
} }
if err := d.handler.series(lset); err != nil { r.labels = append(r.labels, lset)
return err
}
} }
return nil return nil
} }
func (d *walDecoder) decodeSamples(flag byte, b []byte) error { func (r *WALReader) decodeSamples(flag byte, b []byte) error {
if len(b) < 16 { if len(b) < 16 {
return errors.Wrap(errInvalidSize, "header length") return errors.Wrap(errInvalidSize, "header length")
} }
@ -511,47 +584,7 @@ func (d *walDecoder) decodeSamples(flag byte, b []byte) error {
smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
b = b[8:] b = b[8:]
if err := d.handler.sample(smpl); err != nil { r.samples = append(r.samples, smpl)
return err
}
} }
return nil return nil
} }
func (d *walDecoder) entry() (WALEntryType, byte, []byte, error) {
cw := crc32.NewIEEE()
tr := io.TeeReader(d.r, cw)
b := make([]byte, 6)
if _, err := tr.Read(b); err != nil {
return 0, 0, nil, err
}
var (
etype = WALEntryType(b[0])
flag = b[1]
length = int(binary.BigEndian.Uint32(b[2:]))
)
// Exit if we reached pre-allocated space.
if etype == 0 {
return 0, 0, nil, io.EOF
}
if length > len(d.buf) {
d.buf = make([]byte, length)
}
buf := d.buf[:length]
if _, err := tr.Read(buf); err != nil {
return 0, 0, nil, err
}
_, err := d.r.Read(b[:4])
if err != nil {
return 0, 0, nil, err
}
if exp, has := binary.BigEndian.Uint32(b[:4]), cw.Sum32(); has != exp {
return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp)
}
return etype, flag, buf, nil
}

View file

@ -4,6 +4,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"testing" "testing"
@ -106,10 +107,62 @@ func TestWAL_cut(t *testing.T) {
// We cannot actually check for correct pre-allocation as it is // We cannot actually check for correct pre-allocation as it is
// optional per filesystem and handled transparently. // optional per filesystem and handled transparently.
et, flag, b, err := newWALDecoder(f, nil).entry() et, flag, b, err := NewWALReader(f).nextEntry()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, WALEntrySeries, et) require.Equal(t, WALEntrySeries, et)
require.Equal(t, flag, byte(walSeriesSimple)) require.Equal(t, flag, byte(walSeriesSimple))
require.Equal(t, []byte("Hello World!!"), b) require.Equal(t, []byte("Hello World!!"), b)
} }
} }
// Symmetrical test of reading and writing to the WAL via its main interface.
func TestWAL_Log_Restore(t *testing.T) {
// Generate testing data. It does not make semantical sense but
// for the purpose of this test.
series, err := readPrometheusLabels("testdata/20k.series", 10000)
require.NoError(t, err)
var samples []refdSample
for i := 0; i < 200000; i++ {
samples = append(samples, refdSample{
ref: uint64(i % 10000),
t: int64(i * 2),
v: rand.Float64(),
})
}
dir, err := ioutil.TempDir("", "test_wal_log_restore")
require.NoError(t, err)
defer os.RemoveAll(dir)
w, err := OpenWAL(dir, nil, 0)
require.NoError(t, err)
// Set smaller segment size so we can actually write several files.
w.segmentSize = 300 * 1000
for i := 0; i < len(series); i += 100 {
require.NoError(t, w.Log(series[i:i+100], samples[i*10:(i+100)*10]))
}
require.NoError(t, w.Close())
w, err = OpenWAL(dir, nil, 0)
r := w.Reader()
var i, j int
for r.Next() {
lsets, smpls := r.At()
if l := len(lsets); l > 0 {
require.Equal(t, series[i:i+l], lsets)
i += l
}
if l := len(smpls); l > 0 {
require.Equal(t, samples[j:j+l], smpls)
j += l
}
}
require.NoError(t, r.Err())
}