WAL Watcher: never skip samples from live segments

On start-up, the WAL reader wants to skip samples, exemplars, etc. from
older WAL segments for speed.

After we have read the old segments, every subsequent segment must be
read in full.

Removed parameter `tail`; `readSegment()` decides what to do based on
parameter `allData` instead.

`watch()` now only works on live segments. `readSegmentFile` is used
for the older segments.

Elide error-handling part of `readAndHandleError` into the places it
was called. Note it previously returned `ErrIgnorable` every time
`tail==false`, even when there was no error.

Include a check from @machine424 that samples are not dropped, in
`TestRun_AvoidNotifyWhenBehind`.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-07-08 18:55:10 +01:00
parent 89608c69a7
commit 7d5b5b0b4a
2 changed files with 61 additions and 69 deletions

View file

@ -17,7 +17,6 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"slices"
@ -41,8 +40,7 @@ const (
)
var (
ErrIgnorable = errors.New("ignore me")
readTimeout = 15 * time.Second
readTimeout = 15 * time.Second
)
// WriteTo is an interface used by the Watcher to send the samples it's read
@ -288,21 +286,35 @@ func (w *Watcher) Run() error {
if err != nil {
return err
}
_, lastSegment, err := w.firstAndLast()
if err != nil {
return fmt.Errorf("wal.Segments: %w", err)
}
level.Debug(w.logger).Log("msg", "Reading series from older WAL segments", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment)
for currentSegment < lastSegment && !isClosed(w.quit) {
w.currentSegmentMetric.Set(float64(currentSegment))
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment, "lastSegment", lastSegment)
// On initial replay we ignore any samples records we see, by passing false for allData.
// This speeds up replay of the WAL by > 10x.
if err := w.readSegmentFile(currentSegment, false); err != nil {
return err
}
// For testing: stop when you hit a specific segment.
if currentSegment == w.MaxSegment {
return nil
}
currentSegment++
}
level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment)
for !isClosed(w.quit) {
w.currentSegmentMetric.Set(float64(currentSegment))
// Re-check on each iteration in case a new segment was added,
// because watch() will wait for notifications on the last segment.
_, lastSegment, err := w.firstAndLast()
if err != nil {
return fmt.Errorf("wal.Segments: %w", err)
}
tail := currentSegment >= lastSegment
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment, "lastSegment", lastSegment)
if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) {
level.Debug(w.logger).Log("msg", "Tailing WAL", "currentSegment", currentSegment)
if err := w.watch(currentSegment); err != nil {
return err
}
@ -370,30 +382,31 @@ func (w *Watcher) segments(dir string) ([]int, error) {
return refs, nil
}
func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error {
err := w.readSegment(r, segmentNum, tail)
// Use allData false if replaying on start to cache the series records.
func (w *Watcher) readSegmentFile(segmentNum int, allData bool) error {
segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum))
if err != nil {
return err
}
defer segment.Close()
// Ignore all errors reading to end of segment whilst replaying the WAL.
if !tail {
if err != nil && !errors.Is(err, io.EOF) {
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
} else if r.Offset() != size {
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size)
}
return ErrIgnorable
r := NewLiveReader(w.logger, w.readerMetrics, segment)
size, err := getSegmentSize(w.walDir, segmentNum)
if err != nil {
return fmt.Errorf("getSegmentSize: %w", err)
}
// Otherwise, when we are tailing, non-EOFs are fatal.
err = w.readSegment(r, segmentNum, allData)
if err != nil && !errors.Is(err, io.EOF) {
return err
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
} else if r.Offset() != size {
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size)
}
return nil
}
// Use tail true to indicate that the reader is currently on a segment that is
// actively being written to. If false, assume it's a full segment and we're
// replaying it on start to cache the series records.
func (w *Watcher) watch(segmentNum int, tail bool) error {
func (w *Watcher) watch(segmentNum int) error {
segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum))
if err != nil {
return err
@ -402,17 +415,6 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
size := int64(math.MaxInt64)
if !tail {
var err error
size, err = getSegmentSize(w.walDir, segmentNum)
if err != nil {
return fmt.Errorf("getSegmentSize: %w", err)
}
return w.readAndHandleError(reader, segmentNum, tail, size)
}
checkpointTicker := time.NewTicker(checkpointPeriod)
defer checkpointTicker.Stop()
@ -458,20 +460,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
if last <= segmentNum {
continue
}
err = w.readSegment(reader, segmentNum, tail)
// Ignore errors reading to end of segment whilst replaying the WAL.
if !tail {
switch {
case err != nil && !errors.Is(err, io.EOF):
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "err", err)
case reader.Offset() != size:
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
}
return nil
}
// Otherwise, when we are tailing, non-EOFs are fatal.
err = w.readSegment(reader, segmentNum, true)
if err != nil && !errors.Is(err, io.EOF) {
return err
}
@ -481,16 +470,16 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
// we haven't read due to a notification in quite some time, try reading anyways
case <-readTicker.C:
level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout)
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
err := w.readSegment(reader, segmentNum, true)
if err != nil && !errors.Is(err, io.EOF) {
return err
}
// still want to reset the ticker so we don't read too often
readTicker.Reset(readTimeout)
case <-w.readNotify:
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
err := w.readSegment(reader, segmentNum, true)
if err != nil && !errors.Is(err, io.EOF) {
return err
}
// still want to reset the ticker so we don't read too often
@ -533,7 +522,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
// Read from a segment and pass the details to w.writer.
// Also used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, allData bool) error {
var (
dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
series []record.RefSeries
@ -560,9 +549,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.writer.StoreSeries(series, segmentNum)
case record.Samples:
// If we're not tailing a segment we can ignore any samples records we see.
// This speeds up replay of the WAL by > 10x.
if !tail {
if !allData {
break
}
samples, err := dec.Samples(rec, samples[:0])
@ -590,9 +577,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !w.sendExemplars {
break
}
// If we're not tailing a segment we can ignore any exemplars records we see.
// This speeds up replay of the WAL significantly.
if !tail {
if !allData {
break
}
exemplars, err := dec.Exemplars(rec, exemplars[:0])
@ -607,7 +592,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !w.sendHistograms {
break
}
if !tail {
if !allData {
break
}
histograms, err := dec.HistogramSamples(rec, histograms[:0])
@ -634,7 +619,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if !w.sendHistograms {
break
}
if !tail {
if !allData {
break
}
floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0])
@ -658,7 +643,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
}
case record.Metadata:
if !w.sendMetadata || !tail {
if !w.sendMetadata || !allData {
break
}
meta, err := dec.Metadata(rec, metadata[:0])

View file

@ -784,6 +784,13 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
err = watcher.Run()
wg.Wait()
require.Less(t, time.Since(startTime), readTimeout)
// But samples records shouldn't get dropped
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() > 0
})
require.Greater(t, wt.samplesAppended, 0)
require.NoError(t, err)
require.NoError(t, w.Close())
})