LiveReader can get into an infinite loop on corrupt WALs. (#524)

Make WAL live tailer return EOF when the there is a half-written record at the end of the file.

Previously, this would cause an infinite loop as we ignored EOFs when filling the buffer.  We now differentiate between EOFs that read >0 bytes, and EOFs that didn't.

Add some more unit tests for tailing a corrupt WAL, and unify interfaces Reader and LiveReader for the purposes of testing.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
Tom Wilkie 2019-02-19 14:33:57 +00:00 committed by GitHub
parent 8913617a9e
commit 77d5a7d47a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 973 additions and 765 deletions

284
wal/live_reader.go Normal file
View file

@ -0,0 +1,284 @@
// Copyright 2019 The Prometheus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
readerCorruptionErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_reader_corruption_errors",
Help: "Errors encountered when reading the WAL.",
}, []string{"error"})
)
// NewLiveReader returns a new live reader.
func NewLiveReader(logger log.Logger, r io.Reader) *LiveReader {
return &LiveReader{
logger: logger,
rdr: r,
// Until we understand how they come about, make readers permissive
// to records spanning pages.
permissive: true,
}
}
// LiveReader reads WAL records from an io.Reader. It allows reading of WALs
// that are still in the process of being written, and returns records as soon
// as they can be read.
type LiveReader struct {
logger log.Logger
rdr io.Reader
err error
rec []byte
hdr [recordHeaderSize]byte
buf [pageSize]byte
readIndex int // Index in buf to start at for next read.
writeIndex int // Index in buf to start at for next write.
total int64 // Total bytes processed during reading in calls to Next().
index int // Used to track partial records, should be 0 at the start of every new record.
// For testing, we can treat EOF as a non-error.
eofNonErr bool
// We sometime see records span page boundaries. Should never happen, but it
// does. Until we track down why, set permissive to true to tolerate it.
// NB the non-ive Reader implementation allows for this.
permissive bool
}
// Err returns any errors encountered reading the WAL. io.EOFs are not terminal
// and Next can be tried again. Non-EOFs are terminal, and the reader should
// not be used again. It is up to the user to decide when to stop trying should
// io.EOF be returned.
func (r *LiveReader) Err() error {
if r.eofNonErr && r.err == io.EOF {
return nil
}
return r.err
}
// Offset returns the number of bytes consumed from this segment.
func (r *LiveReader) Offset() int64 {
return r.total
}
func (r *LiveReader) fillBuffer() (int, error) {
n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)])
r.writeIndex += n
return n, err
}
// Next returns true if Record() will contain a full record.
// If Next returns false, you should always checked the contents of Error().
// Return false guarantees there are no more records if the segment is closed
// and not corrupt, otherwise if Err() == io.EOF you should try again when more
// data has been written.
func (r *LiveReader) Next() bool {
for {
// If buildRecord returns a non-EOF error, its game up - the segment is
// corrupt. If buildRecord returns an EOF, we try and read more in
// fillBuffer later on. If that fails to read anything (n=0 && err=EOF),
// we return EOF and the user can try again later. If we have a full
// page, buildRecord is guaranteed to return a record or a non-EOF; it
// has checks the records fit in pages.
if ok, err := r.buildRecord(); ok {
return true
} else if err != nil && err != io.EOF {
r.err = err
return false
}
// If we've filled the page and not found a record, this
// means records have started to span pages. Shouldn't happen
// but does and until we found out why, we need to deal with this.
if r.permissive && r.writeIndex == pageSize && r.readIndex > 0 {
copy(r.buf[:], r.buf[r.readIndex:])
r.writeIndex -= r.readIndex
r.readIndex = 0
continue
}
if r.readIndex == pageSize {
r.writeIndex = 0
r.readIndex = 0
}
if r.writeIndex != pageSize {
n, err := r.fillBuffer()
if n == 0 || (err != nil && err != io.EOF) {
r.err = err
return false
}
}
}
}
// Record returns the current record.
// The returned byte slice is only valid until the next call to Next.
func (r *LiveReader) Record() []byte {
return r.rec
}
// Rebuild a full record from potentially partial records. Returns false
// if there was an error or if we weren't able to read a record for any reason.
// Returns true if we read a full record. Any record data is appended to
// LiveReader.rec
func (r *LiveReader) buildRecord() (bool, error) {
for {
// Check that we have data in the internal buffer to read.
if r.writeIndex <= r.readIndex {
return false, nil
}
// Attempt to read a record, partial or otherwise.
temp, n, err := r.readRecord()
if err != nil {
return false, err
}
r.readIndex += n
r.total += int64(n)
if temp == nil {
return false, nil
}
rt := recType(r.hdr[0])
if rt == recFirst || rt == recFull {
r.rec = r.rec[:0]
}
r.rec = append(r.rec, temp...)
if err := validateRecord(rt, r.index); err != nil {
r.index = 0
return false, err
}
if rt == recLast || rt == recFull {
r.index = 0
return true, nil
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
r.index++
}
}
// Returns an error if the recType and i indicate an invalid record sequence.
// As an example, if i is > 0 because we've read some amount of a partial record
// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull
// instead of a recLast or recMiddle we would have an invalid record.
func validateRecord(typ recType, i int) error {
switch typ {
case recFull:
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record, dropping buffer")
}
return nil
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record, dropping buffer")
}
return nil
case recLast:
if i == 0 {
return errors.New("unexpected last record, dropping buffer")
}
return nil
default:
return errors.Errorf("unexpected record type %d", typ)
}
}
// Read a sub-record (see recType) from the buffer. It could potentially
// be a full record (recFull) if the record fits within the bounds of a single page.
// Returns a byte slice of the record data read, the number of bytes read, and an error
// if there's a non-zero byte in a page term record or the record checksum fails.
// This is a non-method function to make it clear it does not mutate the reader.
func (r *LiveReader) readRecord() ([]byte, int, error) {
// Special case: for recPageTerm, check that are all zeros to end of page,
// consume them but don't return them.
if r.buf[r.readIndex] == byte(recPageTerm) {
// End of page won't necessarily be end of buffer, as we may have
// got misaligned by records spanning page boundaries.
// r.total % pageSize is the offset into the current page
// that r.readIndex points to in buf. Therefore
// pageSize - (r.total % pageSize) is the amount left to read of
// the current page.
remaining := int(pageSize - (r.total % pageSize))
if r.readIndex+remaining > r.writeIndex {
return nil, 0, io.EOF
}
for i := r.readIndex; i < r.readIndex+remaining; i++ {
if r.buf[i] != 0 {
return nil, 0, errors.New("unexpected non-zero byte in page term bytes")
}
}
return nil, remaining, nil
}
// Not a recPageTerm; read the record and check the checksum.
if r.writeIndex-r.readIndex < recordHeaderSize {
return nil, 0, io.EOF
}
copy(r.hdr[:], r.buf[r.readIndex:r.readIndex+recordHeaderSize])
length := int(binary.BigEndian.Uint16(r.hdr[1:]))
crc := binary.BigEndian.Uint32(r.hdr[3:])
if r.readIndex+recordHeaderSize+length > pageSize {
if !r.permissive {
return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize)
}
readerCorruptionErrors.WithLabelValues("record_span_page").Inc()
level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize)
}
if recordHeaderSize+length > pageSize {
return nil, 0, fmt.Errorf("record length greater than a single page: %d > %d", recordHeaderSize+length, pageSize)
}
if r.readIndex+recordHeaderSize+length > r.writeIndex {
return nil, 0, io.EOF
}
rec := r.buf[r.readIndex+recordHeaderSize : r.readIndex+recordHeaderSize+length]
if c := crc32.Checksum(rec, castagnoliTable); c != crc {
return nil, 0, errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
return rec, length + recordHeaderSize, nil
}
func min(i, j int) int {
if i < j {
return i
}
return j
}

183
wal/reader.go Normal file
View file

@ -0,0 +1,183 @@
// Copyright 2019 The Prometheus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"encoding/binary"
"hash/crc32"
"io"
"github.com/pkg/errors"
)
// Reader reads WAL records from an io.Reader.
type Reader struct {
rdr io.Reader
err error
rec []byte
buf [pageSize]byte
total int64 // Total bytes processed.
curRecTyp recType // Used for checking that the last record is not torn.
}
// NewReader returns a new reader.
func NewReader(r io.Reader) *Reader {
return &Reader{rdr: r}
}
// Next advances the reader to the next records and returns true if it exists.
// It must not be called again after it returned false.
func (r *Reader) Next() bool {
err := r.next()
if errors.Cause(err) == io.EOF {
// The last WAL segment record shouldn't be torn(should be full or last).
// The last record would be torn after a crash just before
// the last record part could be persisted to disk.
if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle {
r.err = errors.New("last record is torn")
}
return false
}
r.err = err
return r.err == nil
}
func (r *Reader) next() (err error) {
// We have to use r.buf since allocating byte arrays here fails escape
// analysis and ends up on the heap, even though it seemingly should not.
hdr := r.buf[:recordHeaderSize]
buf := r.buf[recordHeaderSize:]
r.rec = r.rec[:0]
i := 0
for {
if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil {
return errors.Wrap(err, "read first header byte")
}
r.total++
r.curRecTyp = recType(hdr[0])
// Gobble up zero bytes.
if r.curRecTyp == recPageTerm {
// recPageTerm is a single byte that indicates the rest of the page is padded.
// If it's the first byte in a page, buf is too small and
// needs to be resized to fit pageSize-1 bytes.
buf = r.buf[1:]
// We are pedantic and check whether the zeros are actually up
// to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (r.total % pageSize)
if k == pageSize {
continue // Initial 0 byte was last page byte.
}
n, err := io.ReadFull(r.rdr, buf[:k])
if err != nil {
return errors.Wrap(err, "read remaining zeros")
}
r.total += int64(n)
for _, c := range buf[:k] {
if c != 0 {
return errors.New("unexpected non-zero byte in padded page")
}
}
continue
}
n, err := io.ReadFull(r.rdr, hdr[1:])
if err != nil {
return errors.Wrap(err, "read remaining header")
}
r.total += int64(n)
var (
length = binary.BigEndian.Uint16(hdr[1:])
crc = binary.BigEndian.Uint32(hdr[3:])
)
if length > pageSize-recordHeaderSize {
return errors.Errorf("invalid record size %d", length)
}
n, err = io.ReadFull(r.rdr, buf[:length])
if err != nil {
return err
}
r.total += int64(n)
if n != int(length) {
return errors.Errorf("invalid size: expected %d, got %d", length, n)
}
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
r.rec = append(r.rec, buf[:length]...)
if err := validateRecord(r.curRecTyp, i); err != nil {
return err
}
if r.curRecTyp == recLast || r.curRecTyp == recFull {
return nil
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
i++
}
}
// Err returns the last encountered error wrapped in a corruption error.
// If the reader does not allow to infer a segment index and offset, a total
// offset in the reader stream will be provided.
func (r *Reader) Err() error {
if r.err == nil {
return nil
}
if b, ok := r.rdr.(*segmentBufReader); ok {
return &CorruptionErr{
Err: r.err,
Dir: b.segs[b.cur].Dir(),
Segment: b.segs[b.cur].Index(),
Offset: int64(b.off),
}
}
return &CorruptionErr{
Err: r.err,
Segment: -1,
Offset: r.total,
}
}
// Record returns the current record. The returned byte slice is only
// valid until the next call to Next.
func (r *Reader) Record() []byte {
return r.rec
}
// Segment returns the current segment being read.
func (r *Reader) Segment() int {
if b, ok := r.rdr.(*segmentBufReader); ok {
return b.segs[b.cur].Index()
}
return -1
}
// Offset returns the current position of the segment being read.
func (r *Reader) Offset() int64 {
if b, ok := r.rdr.(*segmentBufReader); ok {
return int64(b.off)
}
return r.total
}

506
wal/reader_test.go Normal file
View file

@ -0,0 +1,506 @@
// Copyright 2019 The Prometheus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"runtime"
"strconv"
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/testutil"
)
type reader interface {
Next() bool
Err() error
Record() []byte
Offset() int64
}
type record struct {
t recType
b []byte
}
var readerConstructors = map[string]func(io.Reader) reader{
"Reader": func(r io.Reader) reader {
return NewReader(r)
},
"LiveReader": func(r io.Reader) reader {
lr := NewLiveReader(log.NewNopLogger(), r)
lr.eofNonErr = true
return lr
},
}
var data = make([]byte, 100000)
var testReaderCases = []struct {
t []record
exp [][]byte
fail bool
}{
// Sequence of valid records.
{
t: []record{
{recFull, data[0:200]},
{recFirst, data[200:300]},
{recLast, data[300:400]},
{recFirst, data[400:800]},
{recMiddle, data[800:900]},
{recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary.
{recLast, data[900:900]},
{recFirst, data[900:1000]},
{recMiddle, data[1000:1200]},
{recMiddle, data[1200:30000]},
{recMiddle, data[30000:30001]},
{recMiddle, data[30001:30001]},
{recLast, data[30001:32000]},
},
exp: [][]byte{
data[0:200],
data[200:400],
data[400:900],
data[900:32000],
},
},
// Exactly at the limit of one page minus the header size
{
t: []record{
{recFull, data[0 : pageSize-recordHeaderSize]},
},
exp: [][]byte{
data[:pageSize-recordHeaderSize],
},
},
// More than a full page, this exceeds our buffer and can never happen
// when written by the WAL.
{
t: []record{
{recFull, data[0 : pageSize+1]},
},
fail: true,
},
// Two records the together are too big for a page.
// NB currently the non-live reader succeeds on this. I think this is a bug.
// but we've seen it in production.
{
t: []record{
{recFull, data[:pageSize/2]},
{recFull, data[:pageSize/2]},
},
exp: [][]byte{
data[:pageSize/2],
data[:pageSize/2],
},
},
// Invalid orders of record types.
{
t: []record{{recMiddle, data[:200]}},
fail: true,
},
{
t: []record{{recLast, data[:200]}},
fail: true,
},
{
t: []record{
{recFirst, data[:200]},
{recFull, data[200:400]},
},
fail: true,
},
{
t: []record{
{recFirst, data[:100]},
{recMiddle, data[100:200]},
{recFull, data[200:400]},
},
fail: true,
},
// Non-zero data after page termination.
{
t: []record{
{recFull, data[:100]},
{recPageTerm, append(make([]byte, pageSize-recordHeaderSize-102), 1)},
},
exp: [][]byte{data[:100]},
fail: true,
},
}
func encodedRecord(t recType, b []byte) []byte {
if t == recPageTerm {
return append([]byte{0}, b...)
}
r := make([]byte, recordHeaderSize)
r[0] = byte(t)
binary.BigEndian.PutUint16(r[1:], uint16(len(b)))
binary.BigEndian.PutUint32(r[3:], crc32.Checksum(b, castagnoliTable))
return append(r, b...)
}
// TestReader feeds the reader a stream of encoded records with different types.
func TestReader(t *testing.T) {
for name, fn := range readerConstructors {
for i, c := range testReaderCases {
t.Run(fmt.Sprintf("%s/%d", name, i), func(t *testing.T) {
var buf []byte
for _, r := range c.t {
buf = append(buf, encodedRecord(r.t, r.b)...)
}
r := fn(bytes.NewReader(buf))
for j := 0; r.Next(); j++ {
t.Logf("record %d", j)
rec := r.Record()
if j >= len(c.exp) {
t.Fatal("received more records than expected")
}
testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes")
}
if !c.fail && r.Err() != nil {
t.Fatalf("unexpected error: %s", r.Err())
}
if c.fail && r.Err() == nil {
t.Fatalf("expected error but got none")
}
})
}
}
}
func TestReader_Live(t *testing.T) {
logger := testutil.NewLogger(t)
for i := range testReaderCases {
t.Run(strconv.Itoa(i), func(t *testing.T) {
writeFd, err := ioutil.TempFile("", "TestReader_Live")
testutil.Ok(t, err)
defer os.Remove(writeFd.Name())
go func(i int) {
for _, rec := range testReaderCases[i].t {
rec := encodedRecord(rec.t, rec.b)
_, err := writeFd.Write(rec)
testutil.Ok(t, err)
runtime.Gosched()
}
writeFd.Close()
}(i)
// Read from a second FD on the same file.
readFd, err := os.Open(writeFd.Name())
testutil.Ok(t, err)
reader := NewLiveReader(logger, readFd)
for _, exp := range testReaderCases[i].exp {
for !reader.Next() {
testutil.Assert(t, reader.Err() == io.EOF, "expect EOF, got: %v", reader.Err())
runtime.Gosched()
}
actual := reader.Record()
testutil.Equals(t, exp, actual, "read wrong record")
}
testutil.Assert(t, !reader.Next(), "unexpected record")
if testReaderCases[i].fail {
testutil.Assert(t, reader.Err() != nil, "expected error")
}
})
}
}
const fuzzLen = 500
func generateRandomEntries(w *WAL, records chan []byte) error {
var recs [][]byte
for i := 0; i < fuzzLen; i++ {
var sz int64
switch i % 5 {
case 0, 1:
sz = 50
case 2, 3:
sz = pageSize
default:
sz = pageSize * 8
}
rec := make([]byte, rand.Int63n(sz))
if _, err := rand.Read(rec); err != nil {
return err
}
records <- rec
// Randomly batch up records.
recs = append(recs, rec)
if rand.Intn(4) < 3 {
if err := w.Log(recs...); err != nil {
return err
}
recs = recs[:0]
}
}
return w.Log(recs...)
}
func allSegments(dir string) (io.Reader, error) {
seg, err := listSegments(dir)
if err != nil {
return nil, err
}
var readers []io.Reader
for _, r := range seg {
f, err := os.Open(filepath.Join(dir, r.name))
if err != nil {
return nil, err
}
readers = append(readers, f)
}
return io.MultiReader(readers...), nil
}
func TestReaderFuzz(t *testing.T) {
for name, fn := range readerConstructors {
t.Run(name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_fuzz_live")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := NewSize(nil, nil, dir, 128*pageSize)
testutil.Ok(t, err)
// Buffering required as we're not reading concurrently.
input := make(chan []byte, fuzzLen)
err = generateRandomEntries(w, input)
testutil.Ok(t, err)
close(input)
err = w.Close()
testutil.Ok(t, err)
sr, err := allSegments(w.Dir())
testutil.Ok(t, err)
reader := fn(sr)
for expected := range input {
testutil.Assert(t, reader.Next(), "expected record: %v", reader.Err())
testutil.Equals(t, expected, reader.Record(), "read wrong record")
}
testutil.Assert(t, !reader.Next(), "unexpected record")
})
}
}
func TestReaderFuzz_Live(t *testing.T) {
logger := testutil.NewLogger(t)
dir, err := ioutil.TempDir("", "wal_fuzz_live")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := NewSize(nil, nil, dir, 128*pageSize)
testutil.Ok(t, err)
// In the background, generate a stream of random records and write them
// to the WAL.
input := make(chan []byte, fuzzLen/10) // buffering required as we sometimes batch WAL writes.
done := make(chan struct{})
go func() {
err := generateRandomEntries(w, input)
testutil.Ok(t, err)
time.Sleep(100 * time.Millisecond)
close(done)
}()
// Tail the WAL and compare the results.
m, _, err := w.Segments()
testutil.Ok(t, err)
seg, err := OpenReadSegment(SegmentName(dir, m))
testutil.Ok(t, err)
r := NewLiveReader(logger, seg)
segmentTicker := time.NewTicker(100 * time.Millisecond)
readTicker := time.NewTicker(10 * time.Millisecond)
readSegment := func(r *LiveReader) bool {
for r.Next() {
rec := r.Record()
expected, ok := <-input
testutil.Assert(t, ok, "unexpected record")
testutil.Equals(t, expected, rec, "record does not match expected")
}
testutil.Assert(t, r.Err() == io.EOF, "expected EOF, got: %v", r.Err())
return true
}
outer:
for {
select {
case <-segmentTicker.C:
// check if new segments exist
_, last, err := w.Segments()
testutil.Ok(t, err)
if last <= seg.i {
continue
}
// read to end of segment.
readSegment(r)
fi, err := os.Stat(SegmentName(dir, seg.i))
testutil.Ok(t, err)
testutil.Assert(t, r.Offset() == fi.Size(), "expected to have read whole segment, but read %d of %d", r.Offset(), fi.Size())
seg, err = OpenReadSegment(SegmentName(dir, seg.i+1))
testutil.Ok(t, err)
r = NewLiveReader(logger, seg)
case <-readTicker.C:
readSegment(r)
case <-done:
readSegment(r)
break outer
}
}
testutil.Assert(t, r.Err() == io.EOF, "expected EOF")
}
func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
// Write a corrupt WAL segment, there is one record of pageSize in length,
// but the segment is only half written.
logger := testutil.NewLogger(t)
dir, err := ioutil.TempDir("", "wal_live_corrupt")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := NewSize(nil, nil, dir, pageSize)
testutil.Ok(t, err)
rec := make([]byte, pageSize-recordHeaderSize)
_, err = rand.Read(rec)
testutil.Ok(t, err)
err = w.Log(rec)
testutil.Ok(t, err)
err = w.Close()
testutil.Ok(t, err)
segmentFile, err := os.OpenFile(filepath.Join(dir, "00000000"), os.O_RDWR, 0666)
testutil.Ok(t, err)
err = segmentFile.Truncate(pageSize / 2)
testutil.Ok(t, err)
err = segmentFile.Close()
testutil.Ok(t, err)
// Try and LiveReader it.
m, _, err := w.Segments()
testutil.Ok(t, err)
seg, err := OpenReadSegment(SegmentName(dir, m))
testutil.Ok(t, err)
r := NewLiveReader(logger, seg)
testutil.Assert(t, r.Next() == false, "expected no records")
testutil.Assert(t, r.Err() == io.EOF, "expected error, got: %v", r.Err())
}
func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
// Write a corrupt WAL segment, when record len > page size.
logger := testutil.NewLogger(t)
dir, err := ioutil.TempDir("", "wal_live_corrupt")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := NewSize(nil, nil, dir, pageSize*2)
testutil.Ok(t, err)
rec := make([]byte, pageSize-recordHeaderSize)
_, err = rand.Read(rec)
testutil.Ok(t, err)
err = w.Log(rec)
testutil.Ok(t, err)
err = w.Close()
testutil.Ok(t, err)
segmentFile, err := os.OpenFile(filepath.Join(dir, "00000000"), os.O_RDWR, 0666)
testutil.Ok(t, err)
// Override the record length
buf := make([]byte, 3)
buf[0] = byte(recFull)
binary.BigEndian.PutUint16(buf[1:], 0xFFFF)
_, err = segmentFile.WriteAt(buf, 0)
testutil.Ok(t, err)
err = segmentFile.Close()
testutil.Ok(t, err)
// Try and LiveReader it.
m, _, err := w.Segments()
testutil.Ok(t, err)
seg, err := OpenReadSegment(SegmentName(dir, m))
testutil.Ok(t, err)
r := NewLiveReader(logger, seg)
testutil.Assert(t, r.Next() == false, "expected no records")
testutil.Assert(t, r.Err().Error() == "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err())
}
func TestReaderData(t *testing.T) {
dir := os.Getenv("WALDIR")
if dir == "" {
return
}
for name, fn := range readerConstructors {
t.Run(name, func(t *testing.T) {
w, err := New(nil, nil, dir)
testutil.Ok(t, err)
sr, err := allSegments(dir)
testutil.Ok(t, err)
reader := fn(sr)
for reader.Next() {
}
testutil.Ok(t, reader.Err())
err = w.Repair(reader.Err())
testutil.Ok(t, err)
})
}
}

View file

@ -742,390 +742,3 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) {
r.buf.Reset(r.segs[r.cur])
return n, nil
}
// Reader reads WAL records from an io.Reader.
type Reader struct {
rdr io.Reader
err error
rec []byte
buf [pageSize]byte
total int64 // Total bytes processed.
curRecTyp recType // Used for checking that the last record is not torn.
}
// NewReader returns a new reader.
func NewReader(r io.Reader) *Reader {
return &Reader{rdr: r}
}
// Next advances the reader to the next records and returns true if it exists.
// It must not be called again after it returned false.
func (r *Reader) Next() bool {
err := r.next()
if errors.Cause(err) == io.EOF {
// The last WAL segment record shouldn't be torn(should be full or last).
// The last record would be torn after a crash just before
// the last record part could be persisted to disk.
if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle {
r.err = errors.New("last record is torn")
}
return false
}
r.err = err
return r.err == nil
}
func (r *Reader) next() (err error) {
// We have to use r.buf since allocating byte arrays here fails escape
// analysis and ends up on the heap, even though it seemingly should not.
hdr := r.buf[:recordHeaderSize]
buf := r.buf[recordHeaderSize:]
r.rec = r.rec[:0]
i := 0
for {
if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil {
return errors.Wrap(err, "read first header byte")
}
r.total++
r.curRecTyp = recType(hdr[0])
// Gobble up zero bytes.
if r.curRecTyp == recPageTerm {
// recPageTerm is a single byte that indicates the rest of the page is padded.
// If it's the first byte in a page, buf is too small and
// needs to be resized to fit pageSize-1 bytes.
buf = r.buf[1:]
// We are pedantic and check whether the zeros are actually up
// to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (r.total % pageSize)
if k == pageSize {
continue // Initial 0 byte was last page byte.
}
n, err := io.ReadFull(r.rdr, buf[:k])
if err != nil {
return errors.Wrap(err, "read remaining zeros")
}
r.total += int64(n)
for _, c := range buf[:k] {
if c != 0 {
return errors.New("unexpected non-zero byte in padded page")
}
}
continue
}
n, err := io.ReadFull(r.rdr, hdr[1:])
if err != nil {
return errors.Wrap(err, "read remaining header")
}
r.total += int64(n)
var (
length = binary.BigEndian.Uint16(hdr[1:])
crc = binary.BigEndian.Uint32(hdr[3:])
)
if length > pageSize-recordHeaderSize {
return errors.Errorf("invalid record size %d", length)
}
n, err = io.ReadFull(r.rdr, buf[:length])
if err != nil {
return err
}
r.total += int64(n)
if n != int(length) {
return errors.Errorf("invalid size: expected %d, got %d", length, n)
}
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
r.rec = append(r.rec, buf[:length]...)
if err := validateRecord(r.curRecTyp, i); err != nil {
return err
}
if r.curRecTyp == recLast || r.curRecTyp == recFull {
return nil
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
i++
}
}
// Err returns the last encountered error wrapped in a corruption error.
// If the reader does not allow to infer a segment index and offset, a total
// offset in the reader stream will be provided.
func (r *Reader) Err() error {
if r.err == nil {
return nil
}
if b, ok := r.rdr.(*segmentBufReader); ok {
return &CorruptionErr{
Err: r.err,
Dir: b.segs[b.cur].Dir(),
Segment: b.segs[b.cur].Index(),
Offset: int64(b.off),
}
}
return &CorruptionErr{
Err: r.err,
Segment: -1,
Offset: r.total,
}
}
// Record returns the current record. The returned byte slice is only
// valid until the next call to Next.
func (r *Reader) Record() []byte {
return r.rec
}
// Segment returns the current segment being read.
func (r *Reader) Segment() int {
if b, ok := r.rdr.(*segmentBufReader); ok {
return b.segs[b.cur].Index()
}
return -1
}
// Offset returns the current position of the segment being read.
func (r *Reader) Offset() int64 {
if b, ok := r.rdr.(*segmentBufReader); ok {
return int64(b.off)
}
return r.total
}
// NewLiveReader returns a new live reader.
func NewLiveReader(r io.Reader) *LiveReader {
return &LiveReader{rdr: r}
}
// Reader reads WAL records from an io.Reader. It buffers partial record data for
// the next read.
type LiveReader struct {
rdr io.Reader
err error
rec []byte
hdr [recordHeaderSize]byte
buf [pageSize]byte
readIndex int // Index in buf to start at for next read.
writeIndex int // Index in buf to start at for next write.
total int64 // Total bytes processed during reading in calls to Next().
index int // Used to track partial records, should be 0 at the start of every new record.
}
func (r *LiveReader) Err() error {
return r.err
}
func (r *LiveReader) TotalRead() int64 {
return r.total
}
func (r *LiveReader) fillBuffer() error {
n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)])
r.writeIndex += n
return err
}
// Shift the buffer up to the read index.
func (r *LiveReader) shiftBuffer() {
copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex])
r.readIndex = 0
r.writeIndex = copied
}
// Next returns true if r.rec will contain a full record.
// False does not indicate that there will never be more data to
// read for the current io.Reader.
func (r *LiveReader) Next() bool {
for {
if r.buildRecord() {
return true
}
if r.err != nil && r.err != io.EOF {
return false
}
if r.readIndex == pageSize {
r.shiftBuffer()
}
if r.writeIndex != pageSize {
if err := r.fillBuffer(); err != nil {
// We expect to get EOF, since we're reading the segment file as it's being written.
if err != io.EOF {
r.err = err
}
return false
}
}
}
}
// Record returns the current record.
// The returned byte slice is only valid until the next call to Next.
func (r *LiveReader) Record() []byte {
return r.rec
}
// Rebuild a full record from potentially partial records. Returns false
// if there was an error or if we weren't able to read a record for any reason.
// Returns true if we read a full record. Any record data is appeneded to
// LiveReader.rec
func (r *LiveReader) buildRecord() bool {
for {
// Check that we have data in the internal buffer to read.
if r.writeIndex <= r.readIndex {
return false
}
// Attempt to read a record, partial or otherwise.
temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total)
r.readIndex += n
r.total += int64(n)
if err != nil {
r.err = err
return false
}
if temp == nil {
return false
}
rt := recType(r.hdr[0])
if rt == recFirst || rt == recFull {
r.rec = r.rec[:0]
}
r.rec = append(r.rec, temp...)
if err := validateRecord(rt, r.index); err != nil {
r.err = err
r.index = 0
return false
}
if rt == recLast || rt == recFull {
r.index = 0
return true
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
r.index++
}
}
// Returns an error if the recType and i indicate an invalid record sequence.
// As an example, if i is > 0 because we've read some amount of a partial record
// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull
// instead of a recLast or recMiddle we would have an invalid record.
func validateRecord(typ recType, i int) error {
switch typ {
case recFull:
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record, dropping buffer")
}
return nil
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record, dropping buffer")
}
return nil
case recLast:
if i == 0 {
return errors.New("unexpected last record, dropping buffer")
}
return nil
default:
return errors.Errorf("unexpected record type %d", typ)
}
}
// Read a sub-record (see recType) from the buffer. It could potentially
// be a full record (recFull) if the record fits within the bounds of a single page.
// Returns a byte slice of the record data read, the number of bytes read, and an error
// if there's a non-zero byte in a page term record or the record checksum fails.
// TODO(callum) the EOF errors we're returning from this function should theoretically
// never happen, add a metric for them.
func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) {
readIndex := 0
header[0] = buf[0]
readIndex++
total++
// The rest of this function is mostly from Reader.Next().
typ := recType(header[0])
// Gobble up zero bytes.
if typ == recPageTerm {
// We are pedantic and check whether the zeros are actually up to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (total % pageSize)
if k == pageSize {
return nil, 1, nil // Initial 0 byte was last page byte.
}
if k <= int64(len(buf)-readIndex) {
for _, v := range buf[readIndex : int64(readIndex)+k] {
readIndex++
if v != 0 {
return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes")
}
}
return nil, readIndex, nil
}
// Not enough bytes to read the rest of the page term rec.
// This theoretically should never happen, since we're only shifting the
// internal buffer of the live reader when we read to the end of page.
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
if readIndex+recordHeaderSize-1 > len(buf) {
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
copy(header[1:], buf[readIndex:readIndex+len(header[1:])])
readIndex += recordHeaderSize - 1
total += int64(recordHeaderSize - 1)
var (
length = binary.BigEndian.Uint16(header[1:])
crc = binary.BigEndian.Uint32(header[3:])
)
readTo := int(length) + readIndex
if readTo > len(buf) {
if (readTo - readIndex) > pageSize {
return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length))
}
// Not enough data to read all of the record data.
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
recData := buf[readIndex:readTo]
readIndex += int(length)
total += int64(length)
// TODO(callum) what should we do here, throw out the record? We should add a metric at least.
if c := crc32.Checksum(recData, castagnoliTable); c != crc {
return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
return recData, readIndex, nil
}
func min(i, j int) int {
if i < j {
return i
}
return j
}

View file

@ -16,394 +16,16 @@ package wal
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
"path/filepath"
"sync"
"testing"
"time"
"github.com/prometheus/tsdb/testutil"
)
type record struct {
t recType
b []byte
}
var data = make([]byte, 100000)
var testReaderCases = []struct {
t []record
exp [][]byte
fail bool
}{
// Sequence of valid records.
{
t: []record{
{recFull, data[0:200]},
{recFirst, data[200:300]},
{recLast, data[300:400]},
{recFirst, data[400:800]},
{recMiddle, data[800:900]},
{recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary.
{recLast, data[900:900]},
{recFirst, data[900:1000]},
{recMiddle, data[1000:1200]},
{recMiddle, data[1200:30000]},
{recMiddle, data[30000:30001]},
{recMiddle, data[30001:30001]},
{recLast, data[30001:32000]},
},
exp: [][]byte{
data[0:200],
data[200:400],
data[400:900],
data[900:32000],
},
},
// Exactly at the limit of one page minus the header size
{
t: []record{
{recFull, data[0 : pageSize-recordHeaderSize]},
},
exp: [][]byte{
data[:pageSize-recordHeaderSize],
},
},
// More than a full page, this exceeds our buffer and can never happen
// when written by the WAL.
{
t: []record{
{recFull, data[0 : pageSize+1]},
},
fail: true,
},
// Invalid orders of record types.
{
t: []record{{recMiddle, data[:200]}},
fail: true,
},
{
t: []record{{recLast, data[:200]}},
fail: true,
},
{
t: []record{
{recFirst, data[:200]},
{recFull, data[200:400]},
},
fail: true,
},
{
t: []record{
{recFirst, data[:100]},
{recMiddle, data[100:200]},
{recFull, data[200:400]},
},
fail: true,
},
// Non-zero data after page termination.
{
t: []record{
{recFull, data[:100]},
{recPageTerm, append(make([]byte, 1000), 1)},
},
exp: [][]byte{data[:100]},
fail: true,
},
}
func encodedRecord(t recType, b []byte) []byte {
if t == recPageTerm {
return append([]byte{0}, b...)
}
r := make([]byte, recordHeaderSize)
r[0] = byte(t)
binary.BigEndian.PutUint16(r[1:], uint16(len(b)))
binary.BigEndian.PutUint32(r[3:], crc32.Checksum(b, castagnoliTable))
return append(r, b...)
}
// TestReader feeds the reader a stream of encoded records with different types.
func TestReader(t *testing.T) {
for i, c := range testReaderCases {
t.Logf("test %d", i)
var buf []byte
for _, r := range c.t {
buf = append(buf, encodedRecord(r.t, r.b)...)
}
r := NewReader(bytes.NewReader(buf))
for j := 0; r.Next(); j++ {
t.Logf("record %d", j)
rec := r.Record()
if j >= len(c.exp) {
t.Fatal("received more records than inserted")
}
testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes")
}
if !c.fail && r.Err() != nil {
t.Fatalf("unexpected error: %s", r.Err())
}
if c.fail && r.Err() == nil {
t.Fatalf("expected error but got none")
}
}
}
func TestReader_Live(t *testing.T) {
for i, c := range testReaderCases {
t.Logf("test %d", i)
dir, err := ioutil.TempDir("", fmt.Sprintf("live_reader_%d", i))
t.Logf("created dir %s", dir)
testutil.Ok(t, err)
defer os.RemoveAll(dir)
// we're never going to have more than a single segment file per test case right now
f, err := os.Create(path.Join(dir, "00000000"))
testutil.Ok(t, err)
// live reader doesn't work on readers created from bytes buffers,
// since we need to be able to write more data to the thing we're
// reading from after the reader has been created
wg := sync.WaitGroup{}
// make sure the reader doesn't start until at least one record is written
wg.Add(1)
go func() {
for i, rec := range c.t {
rec := encodedRecord(rec.t, rec.b)
n, err := f.Write(rec)
testutil.Ok(t, err)
testutil.Assert(t, n > 0, "no bytes were written to wal")
if i == 0 {
wg.Done()
}
}
}()
sr, err := OpenReadSegment(SegmentName(dir, 0))
testutil.Ok(t, err)
lr := NewLiveReader(sr)
j := 0
wg.Wait()
caseLoop:
for {
for ; lr.Next(); j++ {
rec := lr.Record()
t.Log("j: ", j)
testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes")
if j == len(c.exp)-1 {
break caseLoop
}
}
// Because reads and writes are happening concurrently, unless we get an error we should
// attempt to read records again.
if j == 0 && lr.Err() == nil {
continue
}
if !c.fail && lr.Err() != nil {
t.Fatalf("unexpected error: %s", lr.Err())
}
if c.fail && lr.Err() == nil {
t.Fatalf("expected error but got none:\n\tinput: %+v", c.t)
}
if lr.Err() != nil {
t.Log("err: ", lr.Err())
break
}
}
}
}
func TestWAL_FuzzWriteRead_Live(t *testing.T) {
const count = 500
var input [][]byte
lock := sync.RWMutex{}
var recs [][]byte
var index int
// Get size of segment.
getSegmentSize := func(dir string, index int) (int64, error) {
i := int64(-1)
fi, err := os.Stat(SegmentName(dir, index))
if err == nil {
i = fi.Size()
}
return i, err
}
readSegment := func(r *LiveReader) {
for r.Next() {
rec := r.Record()
lock.RLock()
l := len(input)
lock.RUnlock()
if index >= l {
t.Fatalf("read too many records")
}
lock.RLock()
if !bytes.Equal(input[index], rec) {
t.Fatalf("record %d (len %d) does not match (expected len %d)",
index, len(rec), len(input[index]))
}
lock.RUnlock()
index++
}
if r.Err() != io.EOF {
testutil.Ok(t, r.Err())
}
}
dir, err := ioutil.TempDir("", "wal_fuzz_live")
t.Log("created dir: ", dir)
testutil.Ok(t, err)
defer func() {
os.RemoveAll(dir)
}()
w, err := NewSize(nil, nil, dir, 128*pageSize)
testutil.Ok(t, err)
go func() {
for i := 0; i < count; i++ {
var sz int64
switch i % 5 {
case 0, 1:
sz = 50
case 2, 3:
sz = pageSize
default:
sz = pageSize * 8
}
rec := make([]byte, rand.Int63n(sz))
_, err := rand.Read(rec)
testutil.Ok(t, err)
lock.Lock()
input = append(input, rec)
lock.Unlock()
recs = append(recs, rec)
// Randomly batch up records.
if rand.Intn(4) < 3 {
testutil.Ok(t, w.Log(recs...))
recs = recs[:0]
}
}
testutil.Ok(t, w.Log(recs...))
}()
m, _, err := w.Segments()
testutil.Ok(t, err)
seg, err := OpenReadSegment(SegmentName(dir, m))
testutil.Ok(t, err)
r := NewLiveReader(seg)
segmentTicker := time.NewTicker(100 * time.Millisecond)
readTicker := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-segmentTicker.C:
// check if new segments exist
_, last, err := w.Segments()
testutil.Ok(t, err)
if last > seg.i {
for {
readSegment(r)
if r.Err() != io.EOF {
testutil.Ok(t, r.Err())
}
size, err := getSegmentSize(dir, seg.i)
testutil.Ok(t, err)
// make sure we've read all of the current segment before rotating
if r.TotalRead() == size {
break
}
}
seg, err = OpenReadSegment(SegmentName(dir, seg.i+1))
testutil.Ok(t, err)
r = NewLiveReader(seg)
}
case <-readTicker.C:
readSegment(r)
}
if index == count {
break
}
}
testutil.Ok(t, r.Err())
}
func TestWAL_FuzzWriteRead(t *testing.T) {
const count = 25000
dir, err := ioutil.TempDir("", "walfuzz")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := NewSize(nil, nil, dir, 128*pageSize)
testutil.Ok(t, err)
var input [][]byte
var recs [][]byte
for i := 0; i < count; i++ {
var sz int
switch i % 5 {
case 0, 1:
sz = 50
case 2, 3:
sz = pageSize
default:
sz = 8 * pageSize
}
rec := make([]byte, rand.Intn(sz))
_, err := rand.Read(rec)
testutil.Ok(t, err)
input = append(input, rec)
recs = append(recs, rec)
// Randomly batch up records.
if rand.Intn(4) < 3 {
testutil.Ok(t, w.Log(recs...))
recs = recs[:0]
}
}
testutil.Ok(t, w.Log(recs...))
m, n, err := w.Segments()
testutil.Ok(t, err)
rc, err := NewSegmentsRangeReader(SegmentRange{Dir: dir, First: m, Last: n})
testutil.Ok(t, err)
defer rc.Close()
rdr := NewReader(rc)
for i := 0; rdr.Next(); i++ {
rec := rdr.Record()
if i >= len(input) {
t.Fatal("read too many records")
}
if !bytes.Equal(input[i], rec) {
t.Fatalf("record %d (len %d) does not match (expected len %d)",
i, len(rec), len(input[i]))
}
}
testutil.Ok(t, rdr.Err())
}
func TestWAL_Repair(t *testing.T) {
for name, test := range map[string]struct {