From 998fafe679b7e02b6a786672ade256aa714e5616 Mon Sep 17 00:00:00 2001 From: Matthieu MOREL Date: Mon, 4 Dec 2023 18:08:43 +0100 Subject: [PATCH] tsdb/wlog: use Go standard errors (#13144) Signed-off-by: Matthieu MOREL --- tsdb/wlog/watcher.go | 52 ++++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index c9f8a4599..1c76e3887 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -14,6 +14,7 @@ package wlog import ( + "errors" "fmt" "io" "math" @@ -25,7 +26,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/slices" @@ -264,7 +264,7 @@ func (w *Watcher) loop() { func (w *Watcher) Run() error { _, lastSegment, err := w.firstAndLast() if err != nil { - return errors.Wrap(err, "wal.Segments") + return fmt.Errorf("wal.Segments: %w", err) } // We want to ensure this is false across iterations since @@ -275,13 +275,13 @@ func (w *Watcher) Run() error { // Backfill from the checkpoint first if it exists. lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir) - if err != nil && err != record.ErrNotFound { - return errors.Wrap(err, "tsdb.LastCheckpoint") + if err != nil && !errors.Is(err, record.ErrNotFound) { + return fmt.Errorf("tsdb.LastCheckpoint: %w", err) } if err == nil { if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegment); err != nil { - return errors.Wrap(err, "readCheckpoint") + return fmt.Errorf("readCheckpoint: %w", err) } } w.lastCheckpoint = lastCheckpoint @@ -371,7 +371,7 @@ func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, s // Ignore all errors reading to end of segment whilst replaying the WAL. if !tail { - if err != nil && errors.Cause(err) != io.EOF { + if err != nil && !errors.Is(err, io.EOF) { level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) } else if r.Offset() != size { level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size) @@ -380,7 +380,7 @@ func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, s } // Otherwise, when we are tailing, non-EOFs are fatal. - if errors.Cause(err) != io.EOF { + if err != nil && !errors.Is(err, io.EOF) { return err } return nil @@ -403,7 +403,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { var err error size, err = getSegmentSize(w.walDir, segmentNum) if err != nil { - return errors.Wrap(err, "getSegmentSize") + return fmt.Errorf("getSegmentSize: %w", err) } return w.readAndHandleError(reader, segmentNum, tail, size) @@ -447,7 +447,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { case <-segmentTicker.C: _, last, err := w.firstAndLast() if err != nil { - return errors.Wrap(err, "segments") + return fmt.Errorf("segments: %w", err) } // Check if new segments exists. @@ -459,7 +459,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { // Ignore errors reading to end of segment whilst replaying the WAL. if !tail { switch { - case err != nil && errors.Cause(err) != io.EOF: + case err != nil && !errors.Is(err, io.EOF): level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "err", err) case reader.Offset() != size: level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) @@ -468,7 +468,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { } // Otherwise, when we are tailing, non-EOFs are fatal. - if errors.Cause(err) != io.EOF { + if err != nil && !errors.Is(err, io.EOF) { return err } @@ -497,8 +497,8 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { func (w *Watcher) garbageCollectSeries(segmentNum int) error { dir, _, err := LastCheckpoint(w.walDir) - if err != nil && err != record.ErrNotFound { - return errors.Wrap(err, "tsdb.LastCheckpoint") + if err != nil && !errors.Is(err, record.ErrNotFound) { + return fmt.Errorf("tsdb.LastCheckpoint: %w", err) } if dir == "" || dir == w.lastCheckpoint { @@ -508,7 +508,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { index, err := checkpointNum(dir) if err != nil { - return errors.Wrap(err, "error parsing checkpoint filename") + return fmt.Errorf("error parsing checkpoint filename: %w", err) } if index >= segmentNum { @@ -519,7 +519,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { level.Debug(w.logger).Log("msg", "New checkpoint detected", "new", dir, "currentSegment", segmentNum) if err = w.readCheckpoint(dir, (*Watcher).readSegmentForGC); err != nil { - return errors.Wrap(err, "readCheckpoint") + return fmt.Errorf("readCheckpoint: %w", err) } // Clear series with a checkpoint or segment index # lower than the checkpoint we just read. @@ -658,7 +658,10 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { w.recordDecodeFailsMetric.Inc() } } - return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err()) + if err := r.Err(); err != nil { + return fmt.Errorf("segment %d: %w", segmentNum, err) + } + return nil } // Go through all series in a segment updating the segmentNum, so we can delete older series. @@ -691,7 +694,10 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error w.recordDecodeFailsMetric.Inc() } } - return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err()) + if err := r.Err(); err != nil { + return fmt.Errorf("segment %d: %w", segmentNum, err) + } + return nil } func (w *Watcher) SetStartTime(t time.Time) { @@ -706,29 +712,29 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir) index, err := checkpointNum(checkpointDir) if err != nil { - return errors.Wrap(err, "checkpointNum") + return fmt.Errorf("checkpointNum: %w", err) } // Ensure we read the whole contents of every segment in the checkpoint dir. segs, err := w.segments(checkpointDir) if err != nil { - return errors.Wrap(err, "Unable to get segments checkpoint dir") + return fmt.Errorf("Unable to get segments checkpoint dir: %w", err) } for _, seg := range segs { size, err := getSegmentSize(checkpointDir, seg) if err != nil { - return errors.Wrap(err, "getSegmentSize") + return fmt.Errorf("getSegmentSize: %w", err) } sr, err := OpenReadSegment(SegmentName(checkpointDir, seg)) if err != nil { - return errors.Wrap(err, "unable to open segment") + return fmt.Errorf("unable to open segment: %w", err) } defer sr.Close() r := NewLiveReader(w.logger, w.readerMetrics, sr) - if err := readFn(w, r, index, false); errors.Cause(err) != io.EOF && err != nil { - return errors.Wrap(err, "readSegment") + if err := readFn(w, r, index, false); err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("readSegment: %w", err) } if r.Offset() != size {