diff --git a/CHANGELOG.md b/CHANGELOG.md index a44471681..5975d8b2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - `OpenBlock` signature changed to take a logger. - [REMOVED] `PrefixMatcher` is considered unused so was removed. - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. + - [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read. ## 0.3.1 - [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. diff --git a/wal/wal.go b/wal/wal.go index 5433fd042..fd90eb90e 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -832,28 +832,13 @@ func (r *Reader) next() (err error) { } r.rec = append(r.rec, buf[:length]...) - switch r.curRecTyp { - case recFull: - if i != 0 { - return errors.New("unexpected full record") - } - return nil - case recFirst: - if i != 0 { - return errors.New("unexpected first record") - } - case recMiddle: - if i == 0 { - return errors.New("unexpected middle record") - } - case recLast: - if i == 0 { - return errors.New("unexpected last record") - } - return nil - default: - return errors.Errorf("unexpected record type %d", r.curRecTyp) + if err := validateRecord(r.curRecTyp, i); err != nil { + return err } + if r.curRecTyp == recLast || r.curRecTyp == recFull { + return nil + } + // Only increment i for non-zero records since we use it // to determine valid content record sequences. i++ @@ -904,6 +889,226 @@ func (r *Reader) Offset() int64 { return r.total } +// NewLiveReader returns a new live reader. +func NewLiveReader(r io.Reader) *LiveReader { + return &LiveReader{rdr: r} +} + +// Reader reads WAL records from an io.Reader. It buffers partial record data for +// the next read. +type LiveReader struct { + rdr io.Reader + err error + rec []byte + hdr [recordHeaderSize]byte + buf [pageSize]byte + readIndex int // Index in buf to start at for next read. + writeIndex int // Index in buf to start at for next write. + total int64 // Total bytes processed during reading in calls to Next(). + index int // Used to track partial records, should be 0 at the start of every new record. +} + +func (r *LiveReader) Err() error { + return r.err +} + +func (r *LiveReader) TotalRead() int64 { + return r.total +} + +func (r *LiveReader) fillBuffer() error { + n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)]) + r.writeIndex += n + return err +} + +// Shift the buffer up to the read index. +func (r *LiveReader) shiftBuffer() { + copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex]) + r.readIndex = 0 + r.writeIndex = copied +} + +// Next returns true if r.rec will contain a full record. +// False does not indicate that there will never be more data to +// read for the current io.Reader. +func (r *LiveReader) Next() bool { + for { + if r.buildRecord() { + return true + } + if r.err != nil && r.err != io.EOF { + return false + } + if r.readIndex == pageSize { + r.shiftBuffer() + } + if r.writeIndex != pageSize { + if err := r.fillBuffer(); err != nil { + // We expect to get EOF, since we're reading the segment file as it's being written. + if err != io.EOF { + r.err = err + } + return false + } + } + } +} + +// Record returns the current record. +// The returned byte slice is only valid until the next call to Next. +func (r *LiveReader) Record() []byte { + return r.rec +} + +// Rebuild a full record from potentially partial records. Returns false +// if there was an error or if we weren't able to read a record for any reason. +// Returns true if we read a full record. Any record data is appeneded to +// LiveReader.rec +func (r *LiveReader) buildRecord() bool { + for { + // Check that we have data in the internal buffer to read. + if r.writeIndex <= r.readIndex { + return false + } + + // Attempt to read a record, partial or otherwise. + temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total) + r.readIndex += n + r.total += int64(n) + if err != nil { + r.err = err + return false + } + + if temp == nil { + return false + } + + rt := recType(r.hdr[0]) + + if rt == recFirst || rt == recFull { + r.rec = r.rec[:0] + } + r.rec = append(r.rec, temp...) + + if err := validateRecord(rt, r.index); err != nil { + r.err = err + r.index = 0 + return false + } + if rt == recLast || rt == recFull { + r.index = 0 + return true + } + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + r.index++ + } +} + +// Returns an error if the recType and i indicate an invalid record sequence. +// As an example, if i is > 0 because we've read some amount of a partial record +// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull +// instead of a recLast or recMiddle we would have an invalid record. +func validateRecord(typ recType, i int) error { + switch typ { + case recFull: + if i != 0 { + return errors.New("unexpected full record") + } + return nil + case recFirst: + if i != 0 { + return errors.New("unexpected first record, dropping buffer") + } + return nil + case recMiddle: + if i == 0 { + return errors.New("unexpected middle record, dropping buffer") + } + return nil + case recLast: + if i == 0 { + return errors.New("unexpected last record, dropping buffer") + } + return nil + default: + return errors.Errorf("unexpected record type %d", typ) + } +} + +// Read a sub-record (see recType) from the buffer. It could potentially +// be a full record (recFull) if the record fits within the bounds of a single page. +// Returns a byte slice of the record data read, the number of bytes read, and an error +// if there's a non-zero byte in a page term record or the record checksum fails. +// TODO(callum) the EOF errors we're returning from this function should theoretically +// never happen, add a metric for them. +func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) { + readIndex := 0 + header[0] = buf[0] + readIndex++ + total++ + + // The rest of this function is mostly from Reader.Next(). + typ := recType(header[0]) + // Gobble up zero bytes. + if typ == recPageTerm { + // We are pedantic and check whether the zeros are actually up to a page boundary. + // It's not strictly necessary but may catch sketchy state early. + k := pageSize - (total % pageSize) + if k == pageSize { + return nil, 1, nil // Initial 0 byte was last page byte. + } + + if k <= int64(len(buf)-readIndex) { + for _, v := range buf[readIndex : int64(readIndex)+k] { + readIndex++ + if v != 0 { + return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes") + } + } + return nil, readIndex, nil + } + // Not enough bytes to read the rest of the page term rec. + // This theoretically should never happen, since we're only shifting the + // internal buffer of the live reader when we read to the end of page. + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + + if readIndex+recordHeaderSize-1 > len(buf) { + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + + copy(header[1:], buf[readIndex:readIndex+len(header[1:])]) + readIndex += recordHeaderSize - 1 + total += int64(recordHeaderSize - 1) + var ( + length = binary.BigEndian.Uint16(header[1:]) + crc = binary.BigEndian.Uint32(header[3:]) + ) + readTo := int(length) + readIndex + if readTo > len(buf) { + if (readTo - readIndex) > pageSize { + return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length)) + } + // Not enough data to read all of the record data. + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + recData := buf[readIndex:readTo] + readIndex += int(length) + total += int64(length) + + // TODO(callum) what should we do here, throw out the record? We should add a metric at least. + if c := crc32.Checksum(recData, castagnoliTable); c != crc { + return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + return recData, readIndex, nil +} + func min(i, j int) int { if i < j { return i diff --git a/wal/wal_test.go b/wal/wal_test.go index 8ac14ebc8..f95b21239 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -17,15 +17,107 @@ package wal import ( "bytes" "encoding/binary" + "fmt" "hash/crc32" + "io" "io/ioutil" "math/rand" "os" + "path" + "sync" "testing" + "time" "github.com/prometheus/tsdb/testutil" ) +type record struct { + t recType + b []byte +} + +var data = make([]byte, 100000) +var testReaderCases = []struct { + t []record + exp [][]byte + fail bool +}{ + // Sequence of valid records. + { + t: []record{ + {recFull, data[0:200]}, + {recFirst, data[200:300]}, + {recLast, data[300:400]}, + {recFirst, data[400:800]}, + {recMiddle, data[800:900]}, + {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. + {recLast, data[900:900]}, + {recFirst, data[900:1000]}, + {recMiddle, data[1000:1200]}, + {recMiddle, data[1200:30000]}, + {recMiddle, data[30000:30001]}, + {recMiddle, data[30001:30001]}, + {recLast, data[30001:32000]}, + }, + exp: [][]byte{ + data[0:200], + data[200:400], + data[400:900], + data[900:32000], + }, + }, + // Exactly at the limit of one page minus the header size + { + t: []record{ + {recFull, data[0 : pageSize-recordHeaderSize]}, + }, + exp: [][]byte{ + data[:pageSize-recordHeaderSize], + }, + }, + // More than a full page, this exceeds our buffer and can never happen + // when written by the WAL. + { + t: []record{ + {recFull, data[0 : pageSize+1]}, + }, + fail: true, + }, + // Invalid orders of record types. + { + t: []record{{recMiddle, data[:200]}}, + fail: true, + }, + { + t: []record{{recLast, data[:200]}}, + fail: true, + }, + { + t: []record{ + {recFirst, data[:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + { + t: []record{ + {recFirst, data[:100]}, + {recMiddle, data[100:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + // Non-zero data after page termination. + { + t: []record{ + {recFull, data[:100]}, + {recPageTerm, append(make([]byte, 1000), 1)}, + }, + exp: [][]byte{data[:100]}, + fail: true, + }, +} + func encodedRecord(t recType, b []byte) []byte { if t == recPageTerm { return append([]byte{0}, b...) @@ -39,95 +131,7 @@ func encodedRecord(t recType, b []byte) []byte { // TestReader feeds the reader a stream of encoded records with different types. func TestReader(t *testing.T) { - data := make([]byte, 100000) - _, err := rand.Read(data) - testutil.Ok(t, err) - - type record struct { - t recType - b []byte - } - cases := []struct { - t []record - exp [][]byte - fail bool - }{ - // Sequence of valid records. - { - t: []record{ - {recFull, data[0:200]}, - {recFirst, data[200:300]}, - {recLast, data[300:400]}, - {recFirst, data[400:800]}, - {recMiddle, data[800:900]}, - {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. - {recLast, data[900:900]}, - {recFirst, data[900:1000]}, - {recMiddle, data[1000:1200]}, - {recMiddle, data[1200:30000]}, - {recMiddle, data[30000:30001]}, - {recMiddle, data[30001:30001]}, - {recLast, data[30001:32000]}, - }, - exp: [][]byte{ - data[0:200], - data[200:400], - data[400:900], - data[900:32000], - }, - }, - // Exactly at the limit of one page minus the header size - { - t: []record{ - {recFull, data[0 : pageSize-recordHeaderSize]}, - }, - exp: [][]byte{ - data[:pageSize-recordHeaderSize], - }, - }, - // More than a full page, this exceeds our buffer and can never happen - // when written by the WAL. - { - t: []record{ - {recFull, data[0 : pageSize+1]}, - }, - fail: true, - }, - // Invalid orders of record types. - { - t: []record{{recMiddle, data[:200]}}, - fail: true, - }, - { - t: []record{{recLast, data[:200]}}, - fail: true, - }, - { - t: []record{ - {recFirst, data[:200]}, - {recFull, data[200:400]}, - }, - fail: true, - }, - { - t: []record{ - {recFirst, data[:100]}, - {recMiddle, data[100:200]}, - {recFull, data[200:400]}, - }, - fail: true, - }, - // Non-zero data after page termination. - { - t: []record{ - {recFull, data[:100]}, - {recPageTerm, append(make([]byte, 1000), 1)}, - }, - exp: [][]byte{data[:100]}, - fail: true, - }, - } - for i, c := range cases { + for i, c := range testReaderCases { t.Logf("test %d", i) var buf []byte @@ -154,6 +158,192 @@ func TestReader(t *testing.T) { } } +func TestReader_Live(t *testing.T) { + for i, c := range testReaderCases { + t.Logf("test %d", i) + dir, err := ioutil.TempDir("", fmt.Sprintf("live_reader_%d", i)) + t.Logf("created dir %s", dir) + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + // we're never going to have more than a single segment file per test case right now + f, err := os.Create(path.Join(dir, "00000000")) + testutil.Ok(t, err) + + // live reader doesn't work on readers created from bytes buffers, + // since we need to be able to write more data to the thing we're + // reading from after the reader has been created + wg := sync.WaitGroup{} + // make sure the reader doesn't start until at least one record is written + wg.Add(1) + go func() { + for i, rec := range c.t { + rec := encodedRecord(rec.t, rec.b) + n, err := f.Write(rec) + testutil.Ok(t, err) + testutil.Assert(t, n > 0, "no bytes were written to wal") + if i == 0 { + wg.Done() + } + } + }() + sr, err := OpenReadSegment(SegmentName(dir, 0)) + testutil.Ok(t, err) + lr := NewLiveReader(sr) + j := 0 + wg.Wait() + caseLoop: + for { + for ; lr.Next(); j++ { + rec := lr.Record() + t.Log("j: ", j) + testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes") + if j == len(c.exp)-1 { + break caseLoop + } + + } + + // Because reads and writes are happening concurrently, unless we get an error we should + // attempt to read records again. + if j == 0 && lr.Err() == nil { + continue + } + + if !c.fail && lr.Err() != nil { + t.Fatalf("unexpected error: %s", lr.Err()) + } + if c.fail && lr.Err() == nil { + t.Fatalf("expected error but got none:\n\tinput: %+v", c.t) + } + if lr.Err() != nil { + t.Log("err: ", lr.Err()) + break + } + } + } +} + +func TestWAL_FuzzWriteRead_Live(t *testing.T) { + const count = 5000 + const segmentSize = int64(128 * 1024 * 1204) + var input [][]byte + lock := sync.RWMutex{} + var recs [][]byte + var index int + + // Get size of segment. + getSegmentSize := func(dir string, index int) (int64, error) { + i := int64(-1) + fi, err := os.Stat(SegmentName(dir, index)) + if err == nil { + i = fi.Size() + } + return i, err + } + + readSegment := func(r *LiveReader) { + for r.Next() { + rec := r.Record() + lock.RLock() + l := len(input) + lock.RUnlock() + if index >= l { + t.Fatalf("read too many records") + } + lock.RLock() + if !bytes.Equal(input[index], rec) { + t.Fatalf("record %d (len %d) does not match (expected len %d)", + index, len(rec), len(input[index])) + } + lock.RUnlock() + index++ + } + if r.Err() != io.EOF { + testutil.Ok(t, r.Err()) + } + } + + dir, err := ioutil.TempDir("", "wal_fuzz_live") + t.Log("created dir: ", dir) + testutil.Ok(t, err) + defer func() { + os.RemoveAll(dir) + }() + + w, err := NewSize(nil, nil, dir, 128*pageSize) + testutil.Ok(t, err) + + go func() { + for i := 0; i < count; i++ { + var sz int64 + switch i % 5 { + case 0, 1: + sz = 50 + case 2, 3: + sz = pageSize + default: + sz = pageSize * 8 + } + + rec := make([]byte, rand.Int63n(sz)) + _, err := rand.Read(rec) + testutil.Ok(t, err) + lock.Lock() + input = append(input, rec) + lock.Unlock() + recs = append(recs, rec) + + // Randomly batch up records. + if rand.Intn(4) < 3 { + testutil.Ok(t, w.Log(recs...)) + recs = recs[:0] + } + } + testutil.Ok(t, w.Log(recs...)) + }() + + m, _, err := w.Segments() + testutil.Ok(t, err) + + seg, err := OpenReadSegment(SegmentName(dir, m)) + testutil.Ok(t, err) + + r := NewLiveReader(seg) + segmentTicker := time.NewTicker(100 * time.Millisecond) + readTicker := time.NewTicker(10 * time.Millisecond) + for { + select { + case <-segmentTicker.C: + // check if new segments exist + _, last, err := w.Segments() + testutil.Ok(t, err) + if last > seg.i { + for { + readSegment(r) + if r.Err() != io.EOF { + testutil.Ok(t, r.Err()) + } + size, err := getSegmentSize(dir, seg.i) + testutil.Ok(t, err) + // make sure we've read all of the current segment before rotating + if r.TotalRead() == size { + break + } + } + seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) + testutil.Ok(t, err) + r = NewLiveReader(seg) + } + case <-readTicker.C: + readSegment(r) + } + if index == count { + break + } + } + testutil.Ok(t, r.Err()) +} func TestWAL_FuzzWriteRead(t *testing.T) { const count = 25000