diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index e4d44afa2..66861a487 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -15,6 +15,7 @@ package agent import ( "context" + "errors" "fmt" "math" "path/filepath" @@ -24,7 +25,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "go.uber.org/atomic" @@ -263,7 +263,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin w, err := wlog.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression) if err != nil { - return nil, errors.Wrap(err, "creating WAL") + return nil, fmt.Errorf("creating WAL: %w", err) } db := &DB{ @@ -302,7 +302,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin if err := db.replayWAL(); err != nil { level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err) if err := w.Repair(err); err != nil { - return nil, errors.Wrap(err, "repair corrupted WAL") + return nil, fmt.Errorf("repair corrupted WAL: %w", err) } level.Info(db.logger).Log("msg", "successfully repaired WAL") } @@ -352,7 +352,7 @@ func (db *DB) replayWAL() error { dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir()) if err != nil && err != record.ErrNotFound { - return errors.Wrap(err, "find last checkpoint") + return fmt.Errorf("find last checkpoint: %w", err) } multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{} @@ -360,7 +360,7 @@ func (db *DB) replayWAL() error { if err == nil { sr, err := wlog.NewSegmentsReader(dir) if err != nil { - return errors.Wrap(err, "open checkpoint") + return fmt.Errorf("open checkpoint: %w", err) } defer func() { if err := sr.Close(); err != nil { @@ -371,7 +371,7 @@ func (db *DB) replayWAL() error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil { - return errors.Wrap(err, "backfill checkpoint") + return fmt.Errorf("backfill checkpoint: %w", err) } startFrom++ level.Info(db.logger).Log("msg", "WAL checkpoint loaded") @@ -380,14 +380,14 @@ func (db *DB) replayWAL() error { // Find the last segment. _, last, err := wlog.Segments(db.wal.Dir()) if err != nil { - return errors.Wrap(err, "finding WAL segments") + return fmt.Errorf("finding WAL segments: %w", err) } // Backfil segments from the most recent checkpoint onwards. for i := startFrom; i <= last; i++ { seg, err := wlog.OpenReadSegment(wlog.SegmentName(db.wal.Dir(), i)) if err != nil { - return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) + return fmt.Errorf("open WAL segment: %d: %w", i, err) } sr := wlog.NewSegmentBufReader(seg) @@ -432,7 +432,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H series, err = dec.Series(rec, series) if err != nil { errCh <- &wlog.CorruptionErr{ - Err: errors.Wrap(err, "decode series"), + Err: fmt.Errorf("decode series: %w", err), Segment: r.Segment(), Offset: r.Offset(), } @@ -444,7 +444,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H samples, err = dec.Samples(rec, samples) if err != nil { errCh <- &wlog.CorruptionErr{ - Err: errors.Wrap(err, "decode samples"), + Err: fmt.Errorf("decode samples: %w", err), Segment: r.Segment(), Offset: r.Offset(), } @@ -456,7 +456,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H histograms, err = dec.HistogramSamples(rec, histograms) if err != nil { errCh <- &wlog.CorruptionErr{ - Err: errors.Wrap(err, "decode histogram samples"), + Err: fmt.Errorf("decode histogram samples: %w", err), Segment: r.Segment(), Offset: r.Offset(), } @@ -468,7 +468,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) if err != nil { errCh <- &wlog.CorruptionErr{ - Err: errors.Wrap(err, "decode float histogram samples"), + Err: fmt.Errorf("decode float histogram samples: %w", err), Segment: r.Segment(), Offset: r.Offset(), } @@ -482,7 +482,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H continue default: errCh <- &wlog.CorruptionErr{ - Err: errors.Errorf("invalid record type %v", dec.Type(rec)), + Err: fmt.Errorf("invalid record type %v", dec.Type(rec)), Segment: r.Segment(), Offset: r.Offset(), } @@ -568,7 +568,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H return err default: if r.Err() != nil { - return errors.Wrap(r.Err(), "read records") + return fmt.Errorf("read records: %w", r.Err()) } return nil } @@ -622,13 +622,13 @@ func (db *DB) truncate(mint int64) error { first, last, err := wlog.Segments(db.wal.Dir()) if err != nil { - return errors.Wrap(err, "get segment range") + return fmt.Errorf("get segment range: %w", err) } // Start a new segment so low ingestion volume instances don't have more WAL // than needed. if _, err := db.wal.NextSegment(); err != nil { - return errors.Wrap(err, "next segment") + return fmt.Errorf("next segment: %w", err) } last-- // Never consider most recent segment for checkpoint @@ -656,10 +656,11 @@ func (db *DB) truncate(mint int64) error { if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil { db.metrics.checkpointCreationFail.Inc() - if _, ok := errors.Cause(err).(*wlog.CorruptionErr); ok { + var cerr *wlog.CorruptionErr + if errors.As(err, &cerr) { db.metrics.walCorruptionsTotal.Inc() } - return errors.Wrap(err, "create checkpoint") + return fmt.Errorf("create checkpoint: %w", err) } if err := db.wal.Truncate(last + 1); err != nil { // If truncating fails, we'll just try it again at the next checkpoint. @@ -780,11 +781,11 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo // equivalent validation code in the TSDB's headAppender. l = l.WithoutEmpty() if l.IsEmpty() { - return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset") + return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) } if lbl, dup := l.HasDuplicateLabelNames(); dup { - return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) } var created bool @@ -841,7 +842,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exem e.Labels = e.Labels.WithoutEmpty() if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup { - return 0, errors.Wrap(tsdb.ErrInvalidExemplar, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidExemplar) } // Exemplar label length does not include chars involved in text rendering such as quotes @@ -903,11 +904,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int // equivalent validation code in the TSDB's headAppender. l = l.WithoutEmpty() if l.IsEmpty() { - return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset") + return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) } if lbl, dup := l.HasDuplicateLabelNames(); dup { - return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) } var created bool