diff --git a/wal/wal.go b/wal/wal.go index a51d43614..280cd1398 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -426,10 +426,10 @@ func (w *WAL) flushPage(clear bool) error { p := w.page clear = clear || p.full() - // No more data will fit into the page. Enqueue and clear it. + // No more data will fit into the page or an implicit clear. + // Enqueue and clear it. if clear { p.alloc = pageSize // Write till end of page. - w.pageCompletions.Inc() } n, err := w.segment.Write(p.buf[p.flushed:p.alloc]) if err != nil { @@ -445,6 +445,7 @@ func (w *WAL) flushPage(clear bool) error { p.alloc = 0 p.flushed = 0 w.donePages++ + w.pageCompletions.Inc() } return nil } @@ -495,10 +496,18 @@ func (w *WAL) Log(recs ...[]byte) error { return nil } -// log writes rec to the log and forces a flush of the current page if its -// the final record of a batch, the record is bigger than the page size or -// the current page is full. +// log writes rec to the log and forces a flush of the current page if: +// - the final record of a batch +// - the record is bigger than the page size +// - the current page is full. func (w *WAL) log(rec []byte, final bool) error { + // When the last page flush failed the page will remain full. + // When the page is full, need to flush it before trying to add more records to it. + if w.page.full() { + if err := w.flushPage(true); err != nil { + return err + } + } // If the record is too big to fit within the active page in the current // segment, terminate the active segment and advance to the next one. // This ensures that records do not cross segment boundaries.