diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 2d5fba7335..c4c4e3c933 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -24,8 +24,6 @@ import ( "path/filepath" "strconv" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/tsdb/chunkenc" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" @@ -285,7 +283,7 @@ func checkCRC32(data, sum []byte) error { // This combination of shifts is the inverse of digest.Sum() in go/src/hash/crc32. want := uint32(sum[0])<<24 + uint32(sum[1])<<16 + uint32(sum[2])<<8 + uint32(sum[3]) if got != want { - return errors.Errorf("checksum mismatch expected:%x, actual:%x", want, got) + return fmt.Errorf("checksum mismatch expected:%x, actual:%x", want, got) } return nil } @@ -398,12 +396,12 @@ func (w *Writer) cut() error { func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, allocSize int64) (headerSize int, newFile *os.File, seq int, returnErr error) { p, seq, err := nextSequenceFile(dirFile.Name()) if err != nil { - return 0, nil, 0, errors.Wrap(err, "next sequence file") + return 0, nil, 0, fmt.Errorf("next sequence file: %w", err) } ptmp := p + ".tmp" f, err := os.OpenFile(ptmp, os.O_WRONLY|os.O_CREATE, 0o666) if err != nil { - return 0, nil, 0, errors.Wrap(err, "open temp file") + return 0, nil, 0, fmt.Errorf("open temp file: %w", err) } defer func() { if returnErr != nil { @@ -418,11 +416,11 @@ func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, all }() if allocSize > 0 { if err = fileutil.Preallocate(f, allocSize, true); err != nil { - return 0, nil, 0, errors.Wrap(err, "preallocate") + return 0, nil, 0, fmt.Errorf("preallocate: %w", err) } } if err = dirFile.Sync(); err != nil { - return 0, nil, 0, errors.Wrap(err, "sync directory") + return 0, nil, 0, fmt.Errorf("sync directory: %w", err) } // Write header metadata for new file. @@ -432,24 +430,24 @@ func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, all n, err := f.Write(metab) if err != nil { - return 0, nil, 0, errors.Wrap(err, "write header") + return 0, nil, 0, fmt.Errorf("write header: %w", err) } if err := f.Close(); err != nil { - return 0, nil, 0, errors.Wrap(err, "close temp file") + return 0, nil, 0, fmt.Errorf("close temp file: %w", err) } f = nil if err := fileutil.Rename(ptmp, p); err != nil { - return 0, nil, 0, errors.Wrap(err, "replace file") + return 0, nil, 0, fmt.Errorf("replace file: %w", err) } f, err = os.OpenFile(p, os.O_WRONLY, 0o666) if err != nil { - return 0, nil, 0, errors.Wrap(err, "open final file") + return 0, nil, 0, fmt.Errorf("open final file: %w", err) } // Skip header for further writes. if _, err := f.Seek(int64(n), 0); err != nil { - return 0, nil, 0, errors.Wrap(err, "seek in final file") + return 0, nil, 0, fmt.Errorf("seek in final file: %w", err) } return n, f, seq, nil } @@ -606,16 +604,16 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err cr := Reader{pool: pool, bs: bs, cs: cs} for i, b := range cr.bs { if b.Len() < SegmentHeaderSize { - return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i) + return nil, fmt.Errorf("invalid segment header in segment %d: %w", i, errInvalidSize) } // Verify magic number. if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks { - return nil, errors.Errorf("invalid magic number %x", m) + return nil, fmt.Errorf("invalid magic number %x", m) } // Verify chunk format version. if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { - return nil, errors.Errorf("invalid chunk format version %d", v) + return nil, fmt.Errorf("invalid chunk format version %d", v) } cr.size += int64(b.Len()) } @@ -641,7 +639,7 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { f, err := fileutil.OpenMmapFile(fn) if err != nil { return nil, tsdb_errors.NewMulti( - errors.Wrap(err, "mmap files"), + fmt.Errorf("mmap files: %w", err), tsdb_errors.CloseAll(cs), ).Err() } @@ -673,20 +671,20 @@ func (s *Reader) Chunk(meta Meta) (chunkenc.Chunk, error) { sgmIndex, chkStart := BlockChunkRef(meta.Ref).Unpack() if sgmIndex >= len(s.bs) { - return nil, errors.Errorf("segment index %d out of range", sgmIndex) + return nil, fmt.Errorf("segment index %d out of range", sgmIndex) } sgmBytes := s.bs[sgmIndex] if chkStart+MaxChunkLengthFieldSize > sgmBytes.Len() { - return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len()) + return nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len()) } // With the minimum chunk length this should never cause us reading // over the end of the slice. c := sgmBytes.Range(chkStart, chkStart+MaxChunkLengthFieldSize) chkDataLen, n := binary.Uvarint(c) if n <= 0 { - return nil, errors.Errorf("reading chunk length failed with %d", n) + return nil, fmt.Errorf("reading chunk length failed with %d", n) } chkEncStart := chkStart + n @@ -695,7 +693,7 @@ func (s *Reader) Chunk(meta Meta) (chunkenc.Chunk, error) { chkDataEnd := chkEnd - crc32.Size if chkEnd > sgmBytes.Len() { - return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len()) + return nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len()) } sum := sgmBytes.Range(chkDataEnd, chkEnd) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index d73eb36f87..b495b61828 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -17,6 +17,8 @@ import ( "bufio" "bytes" "encoding/binary" + "errors" + "fmt" "hash" "io" "os" @@ -25,7 +27,6 @@ import ( "sync" "github.com/dennwc/varint" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "golang.org/x/exp/slices" @@ -107,7 +108,7 @@ type CorruptionErr struct { } func (e *CorruptionErr) Error() string { - return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error() + return fmt.Errorf("corruption in head chunk file %s: %w", segmentFile(e.Dir, e.FileIndex), e.Err).Error() } // chunkPos keeps track of the position in the head chunk files. @@ -240,10 +241,10 @@ type mmappedChunkFile struct { func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Pool, writeBufferSize, writeQueueSize int) (*ChunkDiskMapper, error) { // Validate write buffer size. if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize { - return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize) + return nil, fmt.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize) } if writeBufferSize%1024 != 0 { - return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize) + return nil, fmt.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize) } if err := os.MkdirAll(dir, 0o777); err != nil { @@ -320,7 +321,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { for seq, fn := range files { f, err := fileutil.OpenMmapFile(fn) if err != nil { - return errors.Wrapf(err, "mmap files, file: %s", fn) + return fmt.Errorf("mmap files, file: %s: %w", fn, err) } cdm.closers[seq] = f cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())} @@ -335,23 +336,23 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { lastSeq := chkFileIndices[0] for _, seq := range chkFileIndices[1:] { if seq != lastSeq+1 { - return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq) + return fmt.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq) } lastSeq = seq } for i, b := range cdm.mmappedChunkFiles { if b.byteSlice.Len() < HeadChunkFileHeaderSize { - return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i]) + return fmt.Errorf("%s: invalid head chunk file header: %w", files[i], errInvalidSize) } // Verify magic number. if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks { - return errors.Errorf("%s: invalid magic number %x", files[i], m) + return fmt.Errorf("%s: invalid magic number %x", files[i], m) } // Verify chunk format version. if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { - return errors.Errorf("%s: invalid chunk format version %d", files[i], v) + return fmt.Errorf("%s: invalid chunk format version %d", files[i], v) } } @@ -394,16 +395,16 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro f, err := os.Open(files[lastFile]) if err != nil { - return files, errors.Wrap(err, "open file during last head chunk file repair") + return files, fmt.Errorf("open file during last head chunk file repair: %w", err) } buf := make([]byte, MagicChunksSize) size, err := f.Read(buf) if err != nil && err != io.EOF { - return files, errors.Wrap(err, "failed to read magic number during last head chunk file repair") + return files, fmt.Errorf("failed to read magic number during last head chunk file repair: %w", err) } if err := f.Close(); err != nil { - return files, errors.Wrap(err, "close file during last head chunk file repair") + return files, fmt.Errorf("close file during last head chunk file repair: %w", err) } // We either don't have enough bytes for the magic number or the magic number is 0. @@ -413,7 +414,7 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro if size < MagicChunksSize || binary.BigEndian.Uint32(buf) == 0 { // Corrupt file, hence remove it. if err := os.RemoveAll(files[lastFile]); err != nil { - return files, errors.Wrap(err, "delete corrupted, empty head chunk file during last file repair") + return files, fmt.Errorf("delete corrupted, empty head chunk file during last file repair: %w", err) } delete(files, lastFile) } @@ -559,7 +560,7 @@ func (cdm *ChunkDiskMapper) cutAndExpectRef(chkRef ChunkDiskMapperRef) (err erro } if expSeq, expOffset := chkRef.Unpack(); seq != expSeq || offset != expOffset { - return errors.Errorf("expected newly cut file to have sequence:offset %d:%d, got %d:%d", expSeq, expOffset, seq, offset) + return fmt.Errorf("expected newly cut file to have sequence:offset %d:%d, got %d:%d", expSeq, expOffset, seq, offset) } return nil @@ -701,13 +702,13 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error return nil, &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: -1, - Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex), + Err: fmt.Errorf("head chunk file index %d more than current open file", sgmIndex), } } return nil, &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: sgmIndex, - Err: errors.Errorf("head chunk file index %d does not exist on disk", sgmIndex), + Err: fmt.Errorf("head chunk file index %d does not exist on disk", sgmIndex), } } @@ -715,7 +716,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error return nil, &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: sgmIndex, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()), + Err: fmt.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()), } } @@ -734,7 +735,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error return nil, &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: sgmIndex, - Err: errors.Errorf("reading chunk length failed with %d", n), + Err: fmt.Errorf("reading chunk length failed with %d", n), } } @@ -744,7 +745,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error return nil, &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: sgmIndex, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()), + Err: fmt.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()), } } @@ -761,7 +762,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error return nil, &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: sgmIndex, - Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), + Err: fmt.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), } } @@ -829,7 +830,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, - Err: errors.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+ + Err: fmt.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+ " - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID), } } @@ -866,7 +867,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID), + Err: fmt.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID), } } @@ -879,7 +880,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, - Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), + Err: fmt.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), } } idx += CRCSize @@ -905,7 +906,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID), + Err: fmt.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID), } } } @@ -998,10 +999,9 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) ([]int, error) { // DeleteCorrupted deletes all the head chunk files after the one which had the corruption // (including the corrupt file). func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error { - err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped. - cerr, ok := err.(*CorruptionErr) - if !ok { - return errors.Wrap(originalErr, "cannot handle error") + var cerr *CorruptionErr + if !errors.As(originalErr, &cerr) { + return fmt.Errorf("cannot handle error: %w", originalErr) } // Delete all the head chunk files following the corrupt head chunk file.