add live reader for WAL (#481)

* add live reader for WAL

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2019-01-16 10:09:08 -08:00 committed by GitHub
parent ebf5d74325
commit 3929359302
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 506 additions and 110 deletions

View file

@ -6,6 +6,7 @@
- `OpenBlock` signature changed to take a logger. - `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed. - [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.
## 0.3.1 ## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. - [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.

View file

@ -832,28 +832,13 @@ func (r *Reader) next() (err error) {
} }
r.rec = append(r.rec, buf[:length]...) r.rec = append(r.rec, buf[:length]...)
switch r.curRecTyp { if err := validateRecord(r.curRecTyp, i); err != nil {
case recFull: return err
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record")
}
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record")
}
case recLast:
if i == 0 {
return errors.New("unexpected last record")
}
return nil
default:
return errors.Errorf("unexpected record type %d", r.curRecTyp)
} }
if r.curRecTyp == recLast || r.curRecTyp == recFull {
return nil
}
// Only increment i for non-zero records since we use it // Only increment i for non-zero records since we use it
// to determine valid content record sequences. // to determine valid content record sequences.
i++ i++
@ -904,6 +889,226 @@ func (r *Reader) Offset() int64 {
return r.total return r.total
} }
// NewLiveReader returns a new live reader.
func NewLiveReader(r io.Reader) *LiveReader {
return &LiveReader{rdr: r}
}
// Reader reads WAL records from an io.Reader. It buffers partial record data for
// the next read.
type LiveReader struct {
rdr io.Reader
err error
rec []byte
hdr [recordHeaderSize]byte
buf [pageSize]byte
readIndex int // Index in buf to start at for next read.
writeIndex int // Index in buf to start at for next write.
total int64 // Total bytes processed during reading in calls to Next().
index int // Used to track partial records, should be 0 at the start of every new record.
}
func (r *LiveReader) Err() error {
return r.err
}
func (r *LiveReader) TotalRead() int64 {
return r.total
}
func (r *LiveReader) fillBuffer() error {
n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)])
r.writeIndex += n
return err
}
// Shift the buffer up to the read index.
func (r *LiveReader) shiftBuffer() {
copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex])
r.readIndex = 0
r.writeIndex = copied
}
// Next returns true if r.rec will contain a full record.
// False does not indicate that there will never be more data to
// read for the current io.Reader.
func (r *LiveReader) Next() bool {
for {
if r.buildRecord() {
return true
}
if r.err != nil && r.err != io.EOF {
return false
}
if r.readIndex == pageSize {
r.shiftBuffer()
}
if r.writeIndex != pageSize {
if err := r.fillBuffer(); err != nil {
// We expect to get EOF, since we're reading the segment file as it's being written.
if err != io.EOF {
r.err = err
}
return false
}
}
}
}
// Record returns the current record.
// The returned byte slice is only valid until the next call to Next.
func (r *LiveReader) Record() []byte {
return r.rec
}
// Rebuild a full record from potentially partial records. Returns false
// if there was an error or if we weren't able to read a record for any reason.
// Returns true if we read a full record. Any record data is appeneded to
// LiveReader.rec
func (r *LiveReader) buildRecord() bool {
for {
// Check that we have data in the internal buffer to read.
if r.writeIndex <= r.readIndex {
return false
}
// Attempt to read a record, partial or otherwise.
temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total)
r.readIndex += n
r.total += int64(n)
if err != nil {
r.err = err
return false
}
if temp == nil {
return false
}
rt := recType(r.hdr[0])
if rt == recFirst || rt == recFull {
r.rec = r.rec[:0]
}
r.rec = append(r.rec, temp...)
if err := validateRecord(rt, r.index); err != nil {
r.err = err
r.index = 0
return false
}
if rt == recLast || rt == recFull {
r.index = 0
return true
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
r.index++
}
}
// Returns an error if the recType and i indicate an invalid record sequence.
// As an example, if i is > 0 because we've read some amount of a partial record
// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull
// instead of a recLast or recMiddle we would have an invalid record.
func validateRecord(typ recType, i int) error {
switch typ {
case recFull:
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record, dropping buffer")
}
return nil
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record, dropping buffer")
}
return nil
case recLast:
if i == 0 {
return errors.New("unexpected last record, dropping buffer")
}
return nil
default:
return errors.Errorf("unexpected record type %d", typ)
}
}
// Read a sub-record (see recType) from the buffer. It could potentially
// be a full record (recFull) if the record fits within the bounds of a single page.
// Returns a byte slice of the record data read, the number of bytes read, and an error
// if there's a non-zero byte in a page term record or the record checksum fails.
// TODO(callum) the EOF errors we're returning from this function should theoretically
// never happen, add a metric for them.
func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) {
readIndex := 0
header[0] = buf[0]
readIndex++
total++
// The rest of this function is mostly from Reader.Next().
typ := recType(header[0])
// Gobble up zero bytes.
if typ == recPageTerm {
// We are pedantic and check whether the zeros are actually up to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (total % pageSize)
if k == pageSize {
return nil, 1, nil // Initial 0 byte was last page byte.
}
if k <= int64(len(buf)-readIndex) {
for _, v := range buf[readIndex : int64(readIndex)+k] {
readIndex++
if v != 0 {
return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes")
}
}
return nil, readIndex, nil
}
// Not enough bytes to read the rest of the page term rec.
// This theoretically should never happen, since we're only shifting the
// internal buffer of the live reader when we read to the end of page.
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
if readIndex+recordHeaderSize-1 > len(buf) {
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
copy(header[1:], buf[readIndex:readIndex+len(header[1:])])
readIndex += recordHeaderSize - 1
total += int64(recordHeaderSize - 1)
var (
length = binary.BigEndian.Uint16(header[1:])
crc = binary.BigEndian.Uint32(header[3:])
)
readTo := int(length) + readIndex
if readTo > len(buf) {
if (readTo - readIndex) > pageSize {
return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length))
}
// Not enough data to read all of the record data.
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
recData := buf[readIndex:readTo]
readIndex += int(length)
total += int64(length)
// TODO(callum) what should we do here, throw out the record? We should add a metric at least.
if c := crc32.Checksum(recData, castagnoliTable); c != crc {
return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
return recData, readIndex, nil
}
func min(i, j int) int { func min(i, j int) int {
if i < j { if i < j {
return i return i

View file

@ -17,15 +17,107 @@ package wal
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt"
"hash/crc32" "hash/crc32"
"io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
"path"
"sync"
"testing" "testing"
"time"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
) )
type record struct {
t recType
b []byte
}
var data = make([]byte, 100000)
var testReaderCases = []struct {
t []record
exp [][]byte
fail bool
}{
// Sequence of valid records.
{
t: []record{
{recFull, data[0:200]},
{recFirst, data[200:300]},
{recLast, data[300:400]},
{recFirst, data[400:800]},
{recMiddle, data[800:900]},
{recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary.
{recLast, data[900:900]},
{recFirst, data[900:1000]},
{recMiddle, data[1000:1200]},
{recMiddle, data[1200:30000]},
{recMiddle, data[30000:30001]},
{recMiddle, data[30001:30001]},
{recLast, data[30001:32000]},
},
exp: [][]byte{
data[0:200],
data[200:400],
data[400:900],
data[900:32000],
},
},
// Exactly at the limit of one page minus the header size
{
t: []record{
{recFull, data[0 : pageSize-recordHeaderSize]},
},
exp: [][]byte{
data[:pageSize-recordHeaderSize],
},
},
// More than a full page, this exceeds our buffer and can never happen
// when written by the WAL.
{
t: []record{
{recFull, data[0 : pageSize+1]},
},
fail: true,
},
// Invalid orders of record types.
{
t: []record{{recMiddle, data[:200]}},
fail: true,
},
{
t: []record{{recLast, data[:200]}},
fail: true,
},
{
t: []record{
{recFirst, data[:200]},
{recFull, data[200:400]},
},
fail: true,
},
{
t: []record{
{recFirst, data[:100]},
{recMiddle, data[100:200]},
{recFull, data[200:400]},
},
fail: true,
},
// Non-zero data after page termination.
{
t: []record{
{recFull, data[:100]},
{recPageTerm, append(make([]byte, 1000), 1)},
},
exp: [][]byte{data[:100]},
fail: true,
},
}
func encodedRecord(t recType, b []byte) []byte { func encodedRecord(t recType, b []byte) []byte {
if t == recPageTerm { if t == recPageTerm {
return append([]byte{0}, b...) return append([]byte{0}, b...)
@ -39,95 +131,7 @@ func encodedRecord(t recType, b []byte) []byte {
// TestReader feeds the reader a stream of encoded records with different types. // TestReader feeds the reader a stream of encoded records with different types.
func TestReader(t *testing.T) { func TestReader(t *testing.T) {
data := make([]byte, 100000) for i, c := range testReaderCases {
_, err := rand.Read(data)
testutil.Ok(t, err)
type record struct {
t recType
b []byte
}
cases := []struct {
t []record
exp [][]byte
fail bool
}{
// Sequence of valid records.
{
t: []record{
{recFull, data[0:200]},
{recFirst, data[200:300]},
{recLast, data[300:400]},
{recFirst, data[400:800]},
{recMiddle, data[800:900]},
{recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary.
{recLast, data[900:900]},
{recFirst, data[900:1000]},
{recMiddle, data[1000:1200]},
{recMiddle, data[1200:30000]},
{recMiddle, data[30000:30001]},
{recMiddle, data[30001:30001]},
{recLast, data[30001:32000]},
},
exp: [][]byte{
data[0:200],
data[200:400],
data[400:900],
data[900:32000],
},
},
// Exactly at the limit of one page minus the header size
{
t: []record{
{recFull, data[0 : pageSize-recordHeaderSize]},
},
exp: [][]byte{
data[:pageSize-recordHeaderSize],
},
},
// More than a full page, this exceeds our buffer and can never happen
// when written by the WAL.
{
t: []record{
{recFull, data[0 : pageSize+1]},
},
fail: true,
},
// Invalid orders of record types.
{
t: []record{{recMiddle, data[:200]}},
fail: true,
},
{
t: []record{{recLast, data[:200]}},
fail: true,
},
{
t: []record{
{recFirst, data[:200]},
{recFull, data[200:400]},
},
fail: true,
},
{
t: []record{
{recFirst, data[:100]},
{recMiddle, data[100:200]},
{recFull, data[200:400]},
},
fail: true,
},
// Non-zero data after page termination.
{
t: []record{
{recFull, data[:100]},
{recPageTerm, append(make([]byte, 1000), 1)},
},
exp: [][]byte{data[:100]},
fail: true,
},
}
for i, c := range cases {
t.Logf("test %d", i) t.Logf("test %d", i)
var buf []byte var buf []byte
@ -154,6 +158,192 @@ func TestReader(t *testing.T) {
} }
} }
func TestReader_Live(t *testing.T) {
for i, c := range testReaderCases {
t.Logf("test %d", i)
dir, err := ioutil.TempDir("", fmt.Sprintf("live_reader_%d", i))
t.Logf("created dir %s", dir)
testutil.Ok(t, err)
defer os.RemoveAll(dir)
// we're never going to have more than a single segment file per test case right now
f, err := os.Create(path.Join(dir, "00000000"))
testutil.Ok(t, err)
// live reader doesn't work on readers created from bytes buffers,
// since we need to be able to write more data to the thing we're
// reading from after the reader has been created
wg := sync.WaitGroup{}
// make sure the reader doesn't start until at least one record is written
wg.Add(1)
go func() {
for i, rec := range c.t {
rec := encodedRecord(rec.t, rec.b)
n, err := f.Write(rec)
testutil.Ok(t, err)
testutil.Assert(t, n > 0, "no bytes were written to wal")
if i == 0 {
wg.Done()
}
}
}()
sr, err := OpenReadSegment(SegmentName(dir, 0))
testutil.Ok(t, err)
lr := NewLiveReader(sr)
j := 0
wg.Wait()
caseLoop:
for {
for ; lr.Next(); j++ {
rec := lr.Record()
t.Log("j: ", j)
testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes")
if j == len(c.exp)-1 {
break caseLoop
}
}
// Because reads and writes are happening concurrently, unless we get an error we should
// attempt to read records again.
if j == 0 && lr.Err() == nil {
continue
}
if !c.fail && lr.Err() != nil {
t.Fatalf("unexpected error: %s", lr.Err())
}
if c.fail && lr.Err() == nil {
t.Fatalf("expected error but got none:\n\tinput: %+v", c.t)
}
if lr.Err() != nil {
t.Log("err: ", lr.Err())
break
}
}
}
}
func TestWAL_FuzzWriteRead_Live(t *testing.T) {
const count = 5000
const segmentSize = int64(128 * 1024 * 1204)
var input [][]byte
lock := sync.RWMutex{}
var recs [][]byte
var index int
// Get size of segment.
getSegmentSize := func(dir string, index int) (int64, error) {
i := int64(-1)
fi, err := os.Stat(SegmentName(dir, index))
if err == nil {
i = fi.Size()
}
return i, err
}
readSegment := func(r *LiveReader) {
for r.Next() {
rec := r.Record()
lock.RLock()
l := len(input)
lock.RUnlock()
if index >= l {
t.Fatalf("read too many records")
}
lock.RLock()
if !bytes.Equal(input[index], rec) {
t.Fatalf("record %d (len %d) does not match (expected len %d)",
index, len(rec), len(input[index]))
}
lock.RUnlock()
index++
}
if r.Err() != io.EOF {
testutil.Ok(t, r.Err())
}
}
dir, err := ioutil.TempDir("", "wal_fuzz_live")
t.Log("created dir: ", dir)
testutil.Ok(t, err)
defer func() {
os.RemoveAll(dir)
}()
w, err := NewSize(nil, nil, dir, 128*pageSize)
testutil.Ok(t, err)
go func() {
for i := 0; i < count; i++ {
var sz int64
switch i % 5 {
case 0, 1:
sz = 50
case 2, 3:
sz = pageSize
default:
sz = pageSize * 8
}
rec := make([]byte, rand.Int63n(sz))
_, err := rand.Read(rec)
testutil.Ok(t, err)
lock.Lock()
input = append(input, rec)
lock.Unlock()
recs = append(recs, rec)
// Randomly batch up records.
if rand.Intn(4) < 3 {
testutil.Ok(t, w.Log(recs...))
recs = recs[:0]
}
}
testutil.Ok(t, w.Log(recs...))
}()
m, _, err := w.Segments()
testutil.Ok(t, err)
seg, err := OpenReadSegment(SegmentName(dir, m))
testutil.Ok(t, err)
r := NewLiveReader(seg)
segmentTicker := time.NewTicker(100 * time.Millisecond)
readTicker := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-segmentTicker.C:
// check if new segments exist
_, last, err := w.Segments()
testutil.Ok(t, err)
if last > seg.i {
for {
readSegment(r)
if r.Err() != io.EOF {
testutil.Ok(t, r.Err())
}
size, err := getSegmentSize(dir, seg.i)
testutil.Ok(t, err)
// make sure we've read all of the current segment before rotating
if r.TotalRead() == size {
break
}
}
seg, err = OpenReadSegment(SegmentName(dir, seg.i+1))
testutil.Ok(t, err)
r = NewLiveReader(seg)
}
case <-readTicker.C:
readSegment(r)
}
if index == count {
break
}
}
testutil.Ok(t, r.Err())
}
func TestWAL_FuzzWriteRead(t *testing.T) { func TestWAL_FuzzWriteRead(t *testing.T) {
const count = 25000 const count = 25000