From 3e76f0163e4129175d6b8e2b4a2a4a7fd23b326d Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 18 Jun 2018 07:52:57 -0400 Subject: [PATCH] Address comments Signed-off-by: Fabian Reinartz --- checkpoint.go | 16 ++++++++-------- docs/format/wal.md | 16 ++++++++++++---- head.go | 2 +- wal/wal.go | 38 ++++++++++++++++---------------------- 4 files changed, 37 insertions(+), 35 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index 2ab5f8d95c..87ff5597e3 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -35,9 +35,9 @@ type CheckpointStats struct { DroppedSeries int DroppedSamples int DroppedTombstones int - TotalSeries int - TotalSamples int - TotalTombstones int + TotalSeries int // Processed series including dropped ones. + TotalSamples int // Processed samples inlcuding dropped ones. + TotalTombstones int // Processed tombstones including droppes ones. } // LastCheckpoint returns the directory name of the most recent checkpoint. @@ -129,16 +129,16 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo sr = last } - segs, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) + segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) if err != nil { return nil, errors.Wrap(err, "create segment reader") } - defer segs.Close() + defer segsr.Close() if sr != nil { - sr = io.MultiReader(sr, segs) + sr = io.MultiReader(sr, segsr) } else { - sr = segs + sr = segsr } } @@ -169,7 +169,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo // We don't reset the buffer since we batch up multiple records // before writing them to the checkpoint. - // Remember where the record for this iteration starts. + // Remember where the record for this iteration starts. start := len(buf) rec := r.Record() diff --git a/docs/format/wal.md b/docs/format/wal.md index f0daba24de..ca3fae39de 100644 --- a/docs/format/wal.md +++ b/docs/format/wal.md @@ -5,14 +5,17 @@ e.g. `000000`, `000001`, `000002`, etc., and are limited to 128MB by default. A segment is written to in pages of 32KB. Only the last page of the most recent segment may be partial. A WAL record is an opaque byte slice that gets split up into sub-records should it exceed the remaining space of the current page. Records are never split across -segment boundaries. -The encoding of pages is largely borrowed from [LevelDB's/RocksDB's wirte ahead log.][1] +segment boundaries. If a single record exceeds the default segment size, a segment with +a larger size will be created. +The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.][1] Notable deviations are that the record fragment is encoded as: +``` ┌───────────┬──────────┬────────────┬──────────────┐ │ type <1b> │ len <2b> │ CRC32 <4b> │ data │ └───────────┴──────────┴────────────┴──────────────┘ +``` ## Record encoding @@ -22,6 +25,7 @@ The records written to the write ahead log are encoded as follows: Series records encode the labels that identifier a series and its unique ID. +``` ┌────────────────────────────────────────────┐ │ type = 1 <1b> │ ├────────────────────────────────────────────┤ @@ -36,12 +40,14 @@ Series records encode the labels that identifier a series and its unique ID. │ └───────────────────────┴────────────────┘ │ │ . . . │ └────────────────────────────────────────────┘ +``` ### Sample records Sample records encode samples as a list of triples `(series_id, timestamp, value)`. Series reference and timestamp are encoded as deltas w.r.t the first sample. +``` ┌──────────────────────────────────────────────────────────────────┐ │ type = 2 <1b> │ ├──────────────────────────────────────────────────────────────────┤ @@ -53,13 +59,14 @@ Series reference and timestamp are encoded as deltas w.r.t the first sample. │ └────────────────────┴───────────────────────────┴─────────────┘ │ │ . . . │ └──────────────────────────────────────────────────────────────────┘ +``` ### Tombstone records Tombstone records encode tombstones as a list of triples `(series_id, min_time, max_time)` and specify an interval for which samples of a series got deleted. - +``` ┌─────────────────────────────────────────────────────┐ │ type = 3 <1b> │ ├─────────────────────────────────────────────────────┤ @@ -68,5 +75,6 @@ and specify an interval for which samples of a series got deleted. │ └─────────┴───────────────────┴───────────────────┘ │ │ . . . │ └─────────────────────────────────────────────────────┘ +``` -[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format] \ No newline at end of file +[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format] diff --git a/head.go b/head.go index 4e12369dad..61457911f1 100644 --- a/head.go +++ b/head.go @@ -438,7 +438,7 @@ func (h *Head) Truncate(mint int64) error { return nil // no segments yet. } // The lower third of segments should contain mostly obsolete samples. - // If we have too few segments, it's not worth checkpointing yet. + // If we have less than three segments, it's not worth checkpointing yet. n = m + (n-m)/3 if n <= m { return nil diff --git a/wal/wal.go b/wal/wal.go index 89369195b5..90228184ab 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -23,6 +23,7 @@ import ( "math" "os" "path/filepath" + "sort" "strconv" "sync" "time" @@ -35,9 +36,7 @@ import ( ) const ( - version = 1 defaultSegmentSize = 128 * 1024 * 1024 // 128 MB - maxRecordSize = 1 * 1024 * 1024 // 1MB pageSize = 32 * 1024 // 32KB recordHeaderSize = 7 ) @@ -94,7 +93,6 @@ func (e *CorruptionErr) Error() string { // OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. func OpenWriteSegment(dir string, k int) (*Segment, error) { - // Only .active segments are allowed to be opened for write. f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666) if err != nil { return nil, err @@ -127,7 +125,7 @@ func CreateSegment(dir string, k int) (*Segment, error) { return &Segment{File: f, i: k, dir: dir}, nil } -// OpenReadSegment opens the segment k in dir for reading. +// OpenReadSegment opens the segment with the given filename. func OpenReadSegment(fn string) (*Segment, error) { k, err := strconv.Atoi(filepath.Base(fn)) if err != nil { @@ -142,7 +140,7 @@ func OpenReadSegment(fn string) (*Segment, error) { // WAL is a write ahead log that stores records in segment files. // It must be read from start to end once before logging new data. -// If an errore occurs during read, the repair procedure must be called +// If an erroe occurs during read, the repair procedure must be called // before it's safe to do further writes. // // Segments are written to in pages of 32KB, with records possibly split @@ -244,23 +242,19 @@ Loop: case f := <-w.actorc: f() case donec := <-w.stopc: + close(w.actorc) defer close(donec) break Loop } } // Drain and process any remaining functions. - for { - select { - case f := <-w.actorc: - f() - default: - return - } + for f := range w.actorc { + f() } } // Repair attempts to repair the WAL based on the error. -// It discards all data behind the corruption +// It discards all data after the corruption. func (w *WAL) Repair(err error) error { // We could probably have a mode that only discards torn records right around // the corruption to preserve as data much as possible. @@ -333,7 +327,7 @@ func (w *WAL) Repair(err error) error { // SegmentName builds a segment name for the directory. func SegmentName(dir string, i int) string { - return filepath.Join(dir, fmt.Sprintf("%06d", i)) + return filepath.Join(dir, fmt.Sprintf("%08d", i)) } // nextSegment creates the next segment and closes the previous one. @@ -384,6 +378,7 @@ func (w *WAL) flushPage(clear bool) error { } p.flushed += n + // We flushed an entire page, prepare a new one. if clear { for i := range p.buf { p.buf[i] = 0 @@ -485,7 +480,7 @@ func (w *WAL) log(rec []byte, final bool) error { binary.BigEndian.PutUint16(buf[1:], uint16(len(part))) binary.BigEndian.PutUint32(buf[3:], crc) - copy(buf[7:], part) + copy(buf[recordHeaderSize:], part) p.alloc += len(part) + recordHeaderSize // If we wrote a full record, we can fit more records of the batch @@ -587,6 +582,9 @@ func listSegments(dir string) (refs []segmentRef, err error) { refs = append(refs, segmentRef{s: fn, n: k}) last = k } + sort.Slice(refs, func(i, j int) bool { + return refs[i].n < refs[j].n + }) return refs, nil } @@ -667,10 +665,6 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) { // Only unset more so we don't invalidate the current segment and // offset before the next read. r.more = false - // If no more segments are left, it's the end for the reader. - if len(r.segs) == 0 { - return n, io.EOF - } return n, nil } @@ -689,7 +683,7 @@ func NewReader(r io.Reader) *Reader { } // Next advances the reader to the next records and returns true if it exists. -// It must not be called once after it returned false. +// It must not be called again after it returned false. func (r *Reader) Next() bool { err := r.next() if errors.Cause(err) == io.EOF { @@ -702,8 +696,8 @@ func (r *Reader) Next() bool { func (r *Reader) next() (err error) { // We have to use r.buf since allocating byte arrays here fails escape // analysis and ends up on the heap, even though it seemingly should not. - hdr := r.buf[:7] - buf := r.buf[7:] + hdr := r.buf[:recordHeaderSize] + buf := r.buf[recordHeaderSize:] r.rec = r.rec[:0]