Add basic WAL tests

This commit is contained in:
Fabian Reinartz 2017-02-14 15:54:52 -08:00
parent 84e8027a8e
commit 2c97428a79
3 changed files with 174 additions and 266 deletions

View file

@ -1,11 +1,17 @@
package tsdb package tsdb
import ( import (
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"sort" "sort"
"testing" "testing"
"unsafe"
"github.com/fabxc/tsdb/labels"
promlabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -60,3 +66,29 @@ func BenchmarkCreateSeries(b *testing.B) {
} }
}) })
} }
func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
b, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
p := textparse.New(b)
i := 0
var mets []labels.Labels
hashes := map[uint64]struct{}{}
for p.Next() && i < n {
m := make(labels.Labels, 0, 10)
p.Metric((*promlabels.Labels)(unsafe.Pointer(&m)))
h := m.Hash()
if _, ok := hashes[h]; ok {
continue
}
mets = append(mets, m)
hashes[h] = struct{}{}
i++
}
return mets, p.Err()
}

103
wal.go
View file

@ -21,15 +21,18 @@ import (
type WALEntryType byte type WALEntryType byte
const ( const (
WALMagic = 0x43AF00EF // WALMagic is a 4 byte number every WAL segment file starts with.
WALMagic = uint32(0x43AF00EF)
// Format versioning flag of a WAL segment file. // WALFormatDefault is the version flag for the default outer segment file format.
WALFormatDefault byte = 1 WALFormatDefault = byte(1)
)
// Entry types in a segment file. // Entry types in a segment file.
WALEntrySymbols = 1 const (
WALEntrySeries = 2 WALEntrySymbols WALEntryType = 1
WALEntrySamples = 3 WALEntrySeries WALEntryType = 2
WALEntrySamples WALEntryType = 3
) )
// WAL is a write ahead log for series data. It can only be written to. // WAL is a write ahead log for series data. It can only be written to.
@ -42,9 +45,10 @@ type WAL struct {
logger log.Logger logger log.Logger
flushInterval time.Duration flushInterval time.Duration
segmentSize int64
cur *bufio.Writer cur *bufio.Writer
curN int curN int64
stopc chan struct{} stopc chan struct{}
donec chan struct{} donec chan struct{}
@ -74,6 +78,7 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error
flushInterval: flushInterval, flushInterval: flushInterval,
donec: make(chan struct{}), donec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
segmentSize: walSegmentSizeBytes,
} }
if err := w.initSegments(); err != nil { if err := w.initSegments(); err != nil {
return nil, err return nil, err
@ -101,7 +106,7 @@ func (w *WAL) ReadAll(h *walHandler) error {
dec := newWALDecoder(f, h) dec := newWALDecoder(f, h)
for { for {
if err := dec.entry(); err != nil { if err := dec.next(); err != nil {
if err == io.EOF { if err == io.EOF {
// If file end was reached, move on to the next segment. // If file end was reached, move on to the next segment.
break break
@ -145,7 +150,7 @@ func (w *WAL) initSegments() error {
return nil return nil
} }
if len(fns) > 1 { if len(fns) > 1 {
for _, fn := range fns[:len(fns)-2] { for _, fn := range fns[:len(fns)-1] {
lf, err := fileutil.TryLockFile(fn, os.O_RDONLY, 0666) lf, err := fileutil.TryLockFile(fn, os.O_RDONLY, 0666)
if err != nil { if err != nil {
return err return err
@ -184,19 +189,14 @@ func (w *WAL) initSegments() error {
// cut finishes the currently active segments and open the next one. // cut finishes the currently active segments and open the next one.
// The encoder is reset to point to the new segment. // The encoder is reset to point to the new segment.
func (w *WAL) cut() error { func (w *WAL) cut() error {
// If there's a previous segment, truncate it to its final size // Sync current tail to disc and close.
// and sync everything to disc.
if tf := w.tail(); tf != nil { if tf := w.tail(); tf != nil {
off, err := tf.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if err := tf.Truncate(off); err != nil {
return err
}
if err := w.sync(); err != nil { if err := w.sync(); err != nil {
return err return err
} }
if err := tf.Close(); err != nil {
return err
}
} }
p, _, err := nextSequenceFile(w.dirFile.Name(), "") p, _, err := nextSequenceFile(w.dirFile.Name(), "")
@ -207,10 +207,7 @@ func (w *WAL) cut() error {
if err != nil { if err != nil {
return err return err
} }
if _, err = f.Seek(0, os.SEEK_SET); err != nil { if err = fileutil.Preallocate(f.File, w.segmentSize, true); err != nil {
return err
}
if err = fileutil.Preallocate(f.File, walSegmentSizeBytes, true); err != nil {
return err return err
} }
if err = w.dirFile.Sync(); err != nil { if err = w.dirFile.Sync(); err != nil {
@ -228,7 +225,7 @@ func (w *WAL) cut() error {
w.files = append(w.files, f) w.files = append(w.files, f)
w.cur = bufio.NewWriterSize(f, 4*1024*1024) w.cur = bufio.NewWriterSize(f, 4*1024*1024)
w.curN = len(metab) w.curN = 8
return nil return nil
} }
@ -284,15 +281,12 @@ func (w *WAL) Close() error {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
var merr MultiError
if err := w.sync(); err != nil { if err := w.sync(); err != nil {
return err return err
} }
for _, f := range w.files { // On opening, a WAL must be fully consumed once. Afterwards
merr.Add(f.Close()) // only the current segment will still be open.
} return w.tail().Close()
return merr.Err()
} }
const ( const (
@ -308,16 +302,16 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
sz := 6 + 4 + len(buf) sz := int64(6 + 4 + len(buf))
if w.curN+sz > walSegmentSizeBytes { if w.curN+sz > w.segmentSize {
if err := w.cut(); err != nil { if err := w.cut(); err != nil {
return err return err
} }
} }
h := crc32.NewIEEE() h := crc32.NewIEEE()
wr := io.MultiWriter(h, w.cur) wr := io.MultiWriter(h, w.cur, os.Stdout)
b := make([]byte, 6) b := make([]byte, 6)
b[0] = byte(et) b[0] = byte(et)
@ -437,6 +431,20 @@ func newWALDecoder(r io.Reader, h *walHandler) *walDecoder {
} }
} }
func (d *walDecoder) next() error {
t, flag, b, err := d.entry()
if err != nil {
return err
}
switch t {
case WALEntrySamples:
return d.decodeSamples(flag, b)
case WALEntrySeries:
return d.decodeSamples(flag, b)
}
return errors.Errorf("unknown WAL entry type %q", t)
}
func (d *walDecoder) decodeSeries(flag byte, b []byte) error { func (d *walDecoder) decodeSeries(flag byte, b []byte) error {
for len(b) > 0 { for len(b) > 0 {
l, n := binary.Uvarint(b) l, n := binary.Uvarint(b)
@ -510,10 +518,13 @@ func (d *walDecoder) decodeSamples(flag byte, b []byte) error {
return nil return nil
} }
func (d *walDecoder) entry() error { func (d *walDecoder) entry() (WALEntryType, byte, []byte, error) {
cw := crc32.NewIEEE()
tr := io.TeeReader(d.r, cw)
b := make([]byte, 6) b := make([]byte, 6)
if _, err := d.r.Read(b); err != nil { if _, err := tr.Read(b); err != nil {
return err return 0, 0, nil, err
} }
var ( var (
@ -523,7 +534,7 @@ func (d *walDecoder) entry() error {
) )
// Exit if we reached pre-allocated space. // Exit if we reached pre-allocated space.
if etype == 0 { if etype == 0 {
return io.EOF return 0, 0, nil, io.EOF
} }
if length > len(d.buf) { if length > len(d.buf) {
@ -531,26 +542,16 @@ func (d *walDecoder) entry() error {
} }
buf := d.buf[:length] buf := d.buf[:length]
cw := crc32.NewIEEE()
tr := io.TeeReader(d.r, cw)
if _, err := tr.Read(buf); err != nil { if _, err := tr.Read(buf); err != nil {
return err return 0, 0, nil, err
} }
_, err := d.r.Read(b[:4]) _, err := d.r.Read(b[:4])
if err != nil { if err != nil {
return err return 0, 0, nil, err
} }
if exp, has := binary.BigEndian.Uint32(b[:4]), cw.Sum32(); has != exp { if exp, has := binary.BigEndian.Uint32(b[:4]), cw.Sum32(); has != exp {
return errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp)
} }
switch etype { return etype, flag, buf, nil
case WALEntrySeries:
return d.decodeSeries(flag, buf)
case WALEntrySamples:
return d.decodeSamples(flag, buf)
}
return errors.Errorf("unknown WAL entry type %q", etype)
} }

View file

@ -1,240 +1,115 @@
package tsdb package tsdb
import ( import (
"io" "encoding/binary"
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
"time"
"github.com/fabxc/tsdb/labels" "github.com/coreos/etcd/pkg/fileutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func BenchmarkWALWrite(b *testing.B) { func TestWAL_initSegments(t *testing.T) {
d, err := ioutil.TempDir("", "wal_read_test") tmpdir, err := ioutil.TempDir("", "test_wal_open")
require.NoError(b, err) require.NoError(t, err)
defer os.RemoveAll(tmpdir)
defer func() { df, err := fileutil.OpenDir(tmpdir)
require.NoError(b, os.RemoveAll(d)) require.NoError(t, err)
}()
wal, err := OpenWAL(d, nil, 500*time.Millisecond) w := &WAL{dirFile: df}
require.NoError(b, err)
f, err := os.Open("cmd/tsdb/testdata.1m") // Create segment files with an appropriate header.
require.NoError(b, err) for i := 1; i <= 5; i++ {
metab := make([]byte, 8)
binary.BigEndian.PutUint32(metab[:4], WALMagic)
metab[4] = WALFormatDefault
series, err := readPrometheusLabels(f, b.N/300) f, err := os.Create(fmt.Sprintf("%s/000%d", tmpdir, i))
require.NoError(b, err) require.NoError(t, err)
_, err = f.Write(metab)
var ( require.NoError(t, err)
samples [][]refdSample require.NoError(t, f.Close())
ts int64
)
for i := 0; i < 300; i++ {
ts += int64(30000)
scrape := make([]refdSample, 0, len(series))
for ref := range series {
scrape = append(scrape, refdSample{
ref: uint64(ref),
t: ts,
v: 12345788,
})
}
samples = append(samples, scrape)
} }
b.ResetTimer() // Initialize 5 correct segment files.
require.NoError(t, w.initSegments())
err = wal.Log(series, samples[0]) require.Equal(t, 5, len(w.files), "unexpected number of segments loaded")
require.NoError(b, err)
for _, s := range samples[1:] { // Validate that files are locked properly.
err = wal.Log(nil, s) for _, of := range w.files {
require.NoError(b, err) f, err := os.Open(of.Name())
require.NoError(t, err, "open locked segment %s", f.Name())
_, err = f.Read([]byte{0})
require.NoError(t, err, "read locked segment %s", f.Name())
_, err = f.Write([]byte{0})
require.Error(t, err, "write to tail segment file %s", f.Name())
require.NoError(t, f.Close())
} }
require.NoError(b, wal.Close()) for _, f := range w.files {
} require.NoError(t, f.Close())
func BenchmarkWALRead(b *testing.B) {
f, err := os.Open("cmd/tsdb/testdata.1m")
require.NoError(b, err)
series, err := readPrometheusLabels(f, 1000000)
require.NoError(b, err)
b.Run("test", func(b *testing.B) {
bseries := series[:b.N/300]
d, err := ioutil.TempDir("", "wal_read_test")
require.NoError(b, err)
defer func() {
require.NoError(b, os.RemoveAll(d))
}()
wal, err := OpenWAL(d, nil, 500*time.Millisecond)
require.NoError(b, err)
var (
samples [][]refdSample
ts int64
)
for i := 0; i < 300; i++ {
ts += int64(30000)
scrape := make([]refdSample, 0, len(bseries))
for ref := range bseries {
scrape = append(scrape, refdSample{
ref: uint64(ref),
t: ts,
v: 12345788,
})
}
samples = append(samples, scrape)
}
err = wal.Log(bseries, samples[0])
require.NoError(b, err)
for _, s := range samples[1:] {
err = wal.Log(nil, s)
require.NoError(b, err)
}
require.NoError(b, wal.Close())
b.ResetTimer()
wal, err = OpenWAL(d, nil, 500*time.Millisecond)
require.NoError(b, err)
var numSeries, numSamples int
err = wal.ReadAll(&walHandler{
series: func(lset labels.Labels) error {
numSeries++
return nil
},
sample: func(smpl refdSample) error {
numSamples++
return nil
},
})
require.NoError(b, err)
// stat, _ := wal.f.Stat()
// fmt.Println("read series", numSeries, "read samples", numSamples, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024))
})
}
func BenchmarkWALReadIntoHead(b *testing.B) {
f, err := os.Open("cmd/tsdb/testdata.1m")
require.NoError(b, err)
series, err := readPrometheusLabels(f, 1000000)
require.NoError(b, err)
b.Run("test", func(b *testing.B) {
bseries := series[:b.N/300]
d, err := ioutil.TempDir("", "wal_read_test")
require.NoError(b, err)
defer func() {
require.NoError(b, os.RemoveAll(d))
}()
wal, err := OpenWAL(d, nil, 500*time.Millisecond)
require.NoError(b, err)
var (
samples [][]refdSample
ts int64
)
for i := 0; i < 300; i++ {
ts += int64(30000)
scrape := make([]refdSample, 0, len(bseries))
for ref := range bseries {
scrape = append(scrape, refdSample{
ref: uint64(ref),
t: ts,
v: 12345788,
})
}
samples = append(samples, scrape)
}
err = wal.Log(bseries, samples[0])
require.NoError(b, err)
for _, s := range samples[1:] {
err = wal.Log(nil, s)
require.NoError(b, err)
}
require.NoError(b, wal.Close())
b.ResetTimer()
_, err = OpenWAL(d, nil, 500*time.Millisecond)
require.NoError(b, err)
// stat, _ := head.wal.f.Stat()
// fmt.Println("head block initialized from WAL")
// fmt.Println("read series", head.stats.SeriesCount, "read samples", head.stats.SampleCount, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024))
})
}
func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
dec := expfmt.NewDecoder(r, expfmt.FmtProtoText)
var mets []model.Metric
fps := map[model.Fingerprint]struct{}{}
var mf dto.MetricFamily
var dups int
for i := 0; i < n; {
if err := dec.Decode(&mf); err != nil {
if err == io.EOF {
break
}
return nil, err
}
for _, m := range mf.GetMetric() {
met := make(model.Metric, len(m.GetLabel())+1)
met["__name__"] = model.LabelValue(mf.GetName())
for _, l := range m.GetLabel() {
met[model.LabelName(l.GetName())] = model.LabelValue(l.GetValue())
}
if _, ok := fps[met.Fingerprint()]; ok {
dups++
} else {
mets = append(mets, met)
fps[met.Fingerprint()] = struct{}{}
}
i++
}
} }
lbls := make([]labels.Labels, 0, n) // Make initialization fail by corrupting the header of one file.
f, err := os.OpenFile(w.files[3].Name(), os.O_WRONLY, 0666)
require.NoError(t, err)
for _, m := range mets[:n] { _, err = f.WriteAt([]byte{0}, 4)
lset := make(labels.Labels, 0, len(m)) require.NoError(t, err)
for k, v := range m {
lset = append(lset, labels.Label{Name: string(k), Value: string(v)}) w = &WAL{dirFile: df}
} require.Error(t, w.initSegments(), "init corrupted segments")
lbls = append(lbls, lset)
for _, f := range w.files {
require.NoError(t, f.Close())
}
}
func TestWAL_cut(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_wal_cut")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
// This calls cut() implicitly the first time without a previous tail.
w, err := OpenWAL(tmpdir, nil, 0)
require.NoError(t, err)
require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!")))
require.NoError(t, w.cut(), "cut failed")
// Cutting creates a new file and close the previous tail file.
require.Equal(t, 2, len(w.files))
require.Equal(t, os.ErrInvalid.Error(), w.files[0].Close().Error())
require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!")))
require.NoError(t, w.Close())
for _, of := range w.files {
f, err := os.Open(of.Name())
require.NoError(t, err)
// Verify header data.
metab := make([]byte, 8)
_, err = f.Read(metab)
require.NoError(t, err, "read meta data %s", f.Name())
require.Equal(t, WALMagic, binary.BigEndian.Uint32(metab[:4]), "verify magic")
require.Equal(t, WALFormatDefault, metab[4], "verify format flag")
// We cannot actually check for correct pre-allocation as it is
// optional per filesystem and handled transparently.
et, flag, b, err := newWALDecoder(f, nil).entry()
require.NoError(t, err)
require.Equal(t, WALEntrySeries, et)
require.Equal(t, flag, byte(walSeriesSimple))
require.Equal(t, []byte("Hello World!!"), b)
} }
return lbls, nil
} }