diff --git a/tsdb/wal/wal.go b/tsdb/wal/wal.go index 36df9ad69..3aa87b2c0 100644 --- a/tsdb/wal/wal.go +++ b/tsdb/wal/wal.go @@ -74,9 +74,18 @@ func (p *page) reset() { p.flushed = 0 } +// SegmentFile represents the underlying file used to store a segment. +type SegmentFile interface { + Stat() (os.FileInfo, error) + Sync() error + io.Writer + io.Reader + io.Closer +} + // Segment represents a segment file. type Segment struct { - *os.File + SegmentFile dir string i int } @@ -130,7 +139,7 @@ func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) { return nil, errors.Wrap(err, "zero-pad torn page") } } - return &Segment{File: f, i: k, dir: dir}, nil + return &Segment{SegmentFile: f, i: k, dir: dir}, nil } // CreateSegment creates a new segment k in dir. @@ -139,7 +148,7 @@ func CreateSegment(dir string, k int) (*Segment, error) { if err != nil { return nil, err } - return &Segment{File: f, i: k, dir: dir}, nil + return &Segment{SegmentFile: f, i: k, dir: dir}, nil } // OpenReadSegment opens the segment with the given filename. @@ -152,7 +161,7 @@ func OpenReadSegment(fn string) (*Segment, error) { if err != nil { return nil, err } - return &Segment{File: f, i: k, dir: filepath.Dir(fn)}, nil + return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil } // WAL is a write ahead log that stores records in segment files. @@ -517,8 +526,10 @@ func (w *WAL) flushPage(clear bool) error { if clear { p.alloc = pageSize // Write till end of page. } + n, err := w.segment.Write(p.buf[p.flushed:p.alloc]) if err != nil { + p.flushed += n return err } p.flushed += n @@ -664,6 +675,9 @@ func (w *WAL) log(rec []byte, final bool) error { if w.page.full() { if err := w.flushPage(true); err != nil { + // TODO When the flushing fails at this point and the record has not been + // fully written to the buffer, we end up with a corrupted WAL because some part of the + // record have been written to the buffer, while the rest of the record will be discarded. return err } } @@ -705,7 +719,7 @@ func (w *WAL) Truncate(i int) (err error) { func (w *WAL) fsync(f *Segment) error { start := time.Now() - err := f.File.Sync() + err := f.Sync() w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) return err } diff --git a/tsdb/wal/wal_test.go b/tsdb/wal/wal_test.go index 2175ab668..24d909801 100644 --- a/tsdb/wal/wal_test.go +++ b/tsdb/wal/wal_test.go @@ -17,6 +17,7 @@ package wal import ( "bytes" "fmt" + "io" "io/ioutil" "math/rand" "os" @@ -424,6 +425,103 @@ func TestCompression(t *testing.T) { require.Greater(t, float64(uncompressedSize)*0.75, float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize) } +func TestLogPartialWrite(t *testing.T) { + const segmentSize = pageSize * 2 + record := []byte{1, 2, 3, 4, 5} + + tests := map[string]struct { + numRecords int + faultyRecord int + }{ + "partial write when logging first record in a page": { + numRecords: 10, + faultyRecord: 1, + }, + "partial write when logging record in the middle of a page": { + numRecords: 10, + faultyRecord: 3, + }, + "partial write when logging last record of a page": { + numRecords: (pageSize / (recordHeaderSize + len(record))) + 10, + faultyRecord: pageSize / (recordHeaderSize + len(record)), + }, + // TODO the current implementation suffers this: + //"partial write when logging a record overlapping two pages": { + // numRecords: (pageSize / (recordHeaderSize + len(record))) + 10, + // faultyRecord: pageSize/(recordHeaderSize+len(record)) + 1, + //}, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + dirPath, err := ioutil.TempDir("", "") + require.NoError(t, err) + + w, err := NewSize(nil, nil, dirPath, segmentSize, false) + require.NoError(t, err) + + // Replace the underlying segment file with a mocked one that injects a failure. + w.segment.SegmentFile = &faultySegmentFile{ + SegmentFile: w.segment.SegmentFile, + writeFailureAfter: ((recordHeaderSize + len(record)) * (testData.faultyRecord - 1)) + 2, + writeFailureErr: io.ErrShortWrite, + } + + for i := 1; i <= testData.numRecords; i++ { + if err := w.Log(record); i == testData.faultyRecord { + require.Error(t, io.ErrShortWrite, err) + } else { + require.NoError(t, err) + } + } + + require.NoError(t, w.Close()) + + // Read it back. We expect no corruption. + s, err := OpenReadSegment(SegmentName(dirPath, 0)) + require.NoError(t, err) + + r := NewReader(NewSegmentBufReader(s)) + for i := 0; i < testData.numRecords; i++ { + require.True(t, r.Next()) + require.NoError(t, r.Err()) + require.Equal(t, record, r.Record()) + } + require.False(t, r.Next()) + require.NoError(t, r.Err()) + }) + } +} + +type faultySegmentFile struct { + SegmentFile + + written int + writeFailureAfter int + writeFailureErr error +} + +func (f *faultySegmentFile) Write(p []byte) (int, error) { + if f.writeFailureAfter >= 0 && f.writeFailureAfter < f.written+len(p) { + partialLen := f.writeFailureAfter - f.written + if partialLen <= 0 || partialLen >= len(p) { + partialLen = 1 + } + + // Inject failure. + n, _ := f.SegmentFile.Write(p[:partialLen]) + f.written += n + f.writeFailureAfter = -1 + + return n, f.writeFailureErr + } + + // Proxy the write to the underlying file. + n, err := f.SegmentFile.Write(p) + f.written += n + return n, err +} + func BenchmarkWAL_LogBatched(b *testing.B) { for _, compress := range []bool{true, false} { b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) {