Address comments

Signed-off-by: Fabian Reinartz <freinartz@google.com>
This commit is contained in:
Fabian Reinartz 2018-06-18 07:52:57 -04:00
parent 0ad2b8a349
commit 3e76f0163e
4 changed files with 37 additions and 35 deletions

View file

@ -35,9 +35,9 @@ type CheckpointStats struct {
DroppedSeries int DroppedSeries int
DroppedSamples int DroppedSamples int
DroppedTombstones int DroppedTombstones int
TotalSeries int TotalSeries int // Processed series including dropped ones.
TotalSamples int TotalSamples int // Processed samples inlcuding dropped ones.
TotalTombstones int TotalTombstones int // Processed tombstones including droppes ones.
} }
// LastCheckpoint returns the directory name of the most recent checkpoint. // LastCheckpoint returns the directory name of the most recent checkpoint.
@ -129,16 +129,16 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
sr = last sr = last
} }
segs, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "create segment reader") return nil, errors.Wrap(err, "create segment reader")
} }
defer segs.Close() defer segsr.Close()
if sr != nil { if sr != nil {
sr = io.MultiReader(sr, segs) sr = io.MultiReader(sr, segsr)
} else { } else {
sr = segs sr = segsr
} }
} }
@ -169,7 +169,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
// We don't reset the buffer since we batch up multiple records // We don't reset the buffer since we batch up multiple records
// before writing them to the checkpoint. // before writing them to the checkpoint.
// Remember where the record for this iteration starts. // Remember where the record for this iteration starts.
start := len(buf) start := len(buf)
rec := r.Record() rec := r.Record()

View file

@ -5,14 +5,17 @@ e.g. `000000`, `000001`, `000002`, etc., and are limited to 128MB by default.
A segment is written to in pages of 32KB. Only the last page of the most recent segment A segment is written to in pages of 32KB. Only the last page of the most recent segment
may be partial. A WAL record is an opaque byte slice that gets split up into sub-records may be partial. A WAL record is an opaque byte slice that gets split up into sub-records
should it exceed the remaining space of the current page. Records are never split across should it exceed the remaining space of the current page. Records are never split across
segment boundaries. segment boundaries. If a single record exceeds the default segment size, a segment with
The encoding of pages is largely borrowed from [LevelDB's/RocksDB's wirte ahead log.][1] a larger size will be created.
The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.][1]
Notable deviations are that the record fragment is encoded as: Notable deviations are that the record fragment is encoded as:
```
┌───────────┬──────────┬────────────┬──────────────┐ ┌───────────┬──────────┬────────────┬──────────────┐
│ type <1b> │ len <2b> │ CRC32 <4b> │ data <bytes> │ type <1b> │ len <2b> │ CRC32 <4b> │ data <bytes>
└───────────┴──────────┴────────────┴──────────────┘ └───────────┴──────────┴────────────┴──────────────┘
```
## Record encoding ## Record encoding
@ -22,6 +25,7 @@ The records written to the write ahead log are encoded as follows:
Series records encode the labels that identifier a series and its unique ID. Series records encode the labels that identifier a series and its unique ID.
```
┌────────────────────────────────────────────┐ ┌────────────────────────────────────────────┐
│ type = 1 <1b> │ type = 1 <1b>
├────────────────────────────────────────────┤ ├────────────────────────────────────────────┤
@ -36,12 +40,14 @@ Series records encode the labels that identifier a series and its unique ID.
│ └───────────────────────┴────────────────┘ │ │ └───────────────────────┴────────────────┘ │
│ . . . │ │ . . . │
└────────────────────────────────────────────┘ └────────────────────────────────────────────┘
```
### Sample records ### Sample records
Sample records encode samples as a list of triples `(series_id, timestamp, value)`. Sample records encode samples as a list of triples `(series_id, timestamp, value)`.
Series reference and timestamp are encoded as deltas w.r.t the first sample. Series reference and timestamp are encoded as deltas w.r.t the first sample.
```
┌──────────────────────────────────────────────────────────────────┐ ┌──────────────────────────────────────────────────────────────────┐
│ type = 2 <1b> │ type = 2 <1b>
├──────────────────────────────────────────────────────────────────┤ ├──────────────────────────────────────────────────────────────────┤
@ -53,13 +59,14 @@ Series reference and timestamp are encoded as deltas w.r.t the first sample.
│ └────────────────────┴───────────────────────────┴─────────────┘ │ │ └────────────────────┴───────────────────────────┴─────────────┘ │
│ . . . │ │ . . . │
└──────────────────────────────────────────────────────────────────┘ └──────────────────────────────────────────────────────────────────┘
```
### Tombstone records ### Tombstone records
Tombstone records encode tombstones as a list of triples `(series_id, min_time, max_time)` Tombstone records encode tombstones as a list of triples `(series_id, min_time, max_time)`
and specify an interval for which samples of a series got deleted. and specify an interval for which samples of a series got deleted.
```
┌─────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────┐
│ type = 3 <1b> │ type = 3 <1b>
├─────────────────────────────────────────────────────┤ ├─────────────────────────────────────────────────────┤
@ -68,5 +75,6 @@ and specify an interval for which samples of a series got deleted.
│ └─────────┴───────────────────┴───────────────────┘ │ │ └─────────┴───────────────────┴───────────────────┘ │
│ . . . │ │ . . . │
└─────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────┘
```
[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format] [1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format]

View file

@ -438,7 +438,7 @@ func (h *Head) Truncate(mint int64) error {
return nil // no segments yet. return nil // no segments yet.
} }
// The lower third of segments should contain mostly obsolete samples. // The lower third of segments should contain mostly obsolete samples.
// If we have too few segments, it's not worth checkpointing yet. // If we have less than three segments, it's not worth checkpointing yet.
n = m + (n-m)/3 n = m + (n-m)/3
if n <= m { if n <= m {
return nil return nil

View file

@ -23,6 +23,7 @@ import (
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -35,9 +36,7 @@ import (
) )
const ( const (
version = 1
defaultSegmentSize = 128 * 1024 * 1024 // 128 MB defaultSegmentSize = 128 * 1024 * 1024 // 128 MB
maxRecordSize = 1 * 1024 * 1024 // 1MB
pageSize = 32 * 1024 // 32KB pageSize = 32 * 1024 // 32KB
recordHeaderSize = 7 recordHeaderSize = 7
) )
@ -94,7 +93,6 @@ func (e *CorruptionErr) Error() string {
// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. // OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
func OpenWriteSegment(dir string, k int) (*Segment, error) { func OpenWriteSegment(dir string, k int) (*Segment, error) {
// Only .active segments are allowed to be opened for write.
f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666) f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666)
if err != nil { if err != nil {
return nil, err return nil, err
@ -127,7 +125,7 @@ func CreateSegment(dir string, k int) (*Segment, error) {
return &Segment{File: f, i: k, dir: dir}, nil return &Segment{File: f, i: k, dir: dir}, nil
} }
// OpenReadSegment opens the segment k in dir for reading. // OpenReadSegment opens the segment with the given filename.
func OpenReadSegment(fn string) (*Segment, error) { func OpenReadSegment(fn string) (*Segment, error) {
k, err := strconv.Atoi(filepath.Base(fn)) k, err := strconv.Atoi(filepath.Base(fn))
if err != nil { if err != nil {
@ -142,7 +140,7 @@ func OpenReadSegment(fn string) (*Segment, error) {
// WAL is a write ahead log that stores records in segment files. // WAL is a write ahead log that stores records in segment files.
// It must be read from start to end once before logging new data. // It must be read from start to end once before logging new data.
// If an errore occurs during read, the repair procedure must be called // If an erroe occurs during read, the repair procedure must be called
// before it's safe to do further writes. // before it's safe to do further writes.
// //
// Segments are written to in pages of 32KB, with records possibly split // Segments are written to in pages of 32KB, with records possibly split
@ -244,23 +242,19 @@ Loop:
case f := <-w.actorc: case f := <-w.actorc:
f() f()
case donec := <-w.stopc: case donec := <-w.stopc:
close(w.actorc)
defer close(donec) defer close(donec)
break Loop break Loop
} }
} }
// Drain and process any remaining functions. // Drain and process any remaining functions.
for { for f := range w.actorc {
select { f()
case f := <-w.actorc:
f()
default:
return
}
} }
} }
// Repair attempts to repair the WAL based on the error. // Repair attempts to repair the WAL based on the error.
// It discards all data behind the corruption // It discards all data after the corruption.
func (w *WAL) Repair(err error) error { func (w *WAL) Repair(err error) error {
// We could probably have a mode that only discards torn records right around // We could probably have a mode that only discards torn records right around
// the corruption to preserve as data much as possible. // the corruption to preserve as data much as possible.
@ -333,7 +327,7 @@ func (w *WAL) Repair(err error) error {
// SegmentName builds a segment name for the directory. // SegmentName builds a segment name for the directory.
func SegmentName(dir string, i int) string { func SegmentName(dir string, i int) string {
return filepath.Join(dir, fmt.Sprintf("%06d", i)) return filepath.Join(dir, fmt.Sprintf("%08d", i))
} }
// nextSegment creates the next segment and closes the previous one. // nextSegment creates the next segment and closes the previous one.
@ -384,6 +378,7 @@ func (w *WAL) flushPage(clear bool) error {
} }
p.flushed += n p.flushed += n
// We flushed an entire page, prepare a new one.
if clear { if clear {
for i := range p.buf { for i := range p.buf {
p.buf[i] = 0 p.buf[i] = 0
@ -485,7 +480,7 @@ func (w *WAL) log(rec []byte, final bool) error {
binary.BigEndian.PutUint16(buf[1:], uint16(len(part))) binary.BigEndian.PutUint16(buf[1:], uint16(len(part)))
binary.BigEndian.PutUint32(buf[3:], crc) binary.BigEndian.PutUint32(buf[3:], crc)
copy(buf[7:], part) copy(buf[recordHeaderSize:], part)
p.alloc += len(part) + recordHeaderSize p.alloc += len(part) + recordHeaderSize
// If we wrote a full record, we can fit more records of the batch // If we wrote a full record, we can fit more records of the batch
@ -587,6 +582,9 @@ func listSegments(dir string) (refs []segmentRef, err error) {
refs = append(refs, segmentRef{s: fn, n: k}) refs = append(refs, segmentRef{s: fn, n: k})
last = k last = k
} }
sort.Slice(refs, func(i, j int) bool {
return refs[i].n < refs[j].n
})
return refs, nil return refs, nil
} }
@ -667,10 +665,6 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) {
// Only unset more so we don't invalidate the current segment and // Only unset more so we don't invalidate the current segment and
// offset before the next read. // offset before the next read.
r.more = false r.more = false
// If no more segments are left, it's the end for the reader.
if len(r.segs) == 0 {
return n, io.EOF
}
return n, nil return n, nil
} }
@ -689,7 +683,7 @@ func NewReader(r io.Reader) *Reader {
} }
// Next advances the reader to the next records and returns true if it exists. // Next advances the reader to the next records and returns true if it exists.
// It must not be called once after it returned false. // It must not be called again after it returned false.
func (r *Reader) Next() bool { func (r *Reader) Next() bool {
err := r.next() err := r.next()
if errors.Cause(err) == io.EOF { if errors.Cause(err) == io.EOF {
@ -702,8 +696,8 @@ func (r *Reader) Next() bool {
func (r *Reader) next() (err error) { func (r *Reader) next() (err error) {
// We have to use r.buf since allocating byte arrays here fails escape // We have to use r.buf since allocating byte arrays here fails escape
// analysis and ends up on the heap, even though it seemingly should not. // analysis and ends up on the heap, even though it seemingly should not.
hdr := r.buf[:7] hdr := r.buf[:recordHeaderSize]
buf := r.buf[7:] buf := r.buf[recordHeaderSize:]
r.rec = r.rec[:0] r.rec = r.rec[:0]