mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
Merge pull request #13109 from mmorel-35/patch-3
tsdb/chunks: use Go standard errors package
This commit is contained in:
commit
afc57eb306
|
@ -24,8 +24,6 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
|
@ -285,7 +283,7 @@ func checkCRC32(data, sum []byte) error {
|
|||
// This combination of shifts is the inverse of digest.Sum() in go/src/hash/crc32.
|
||||
want := uint32(sum[0])<<24 + uint32(sum[1])<<16 + uint32(sum[2])<<8 + uint32(sum[3])
|
||||
if got != want {
|
||||
return errors.Errorf("checksum mismatch expected:%x, actual:%x", want, got)
|
||||
return fmt.Errorf("checksum mismatch expected:%x, actual:%x", want, got)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -398,12 +396,12 @@ func (w *Writer) cut() error {
|
|||
func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, allocSize int64) (headerSize int, newFile *os.File, seq int, returnErr error) {
|
||||
p, seq, err := nextSequenceFile(dirFile.Name())
|
||||
if err != nil {
|
||||
return 0, nil, 0, errors.Wrap(err, "next sequence file")
|
||||
return 0, nil, 0, fmt.Errorf("next sequence file: %w", err)
|
||||
}
|
||||
ptmp := p + ".tmp"
|
||||
f, err := os.OpenFile(ptmp, os.O_WRONLY|os.O_CREATE, 0o666)
|
||||
if err != nil {
|
||||
return 0, nil, 0, errors.Wrap(err, "open temp file")
|
||||
return 0, nil, 0, fmt.Errorf("open temp file: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if returnErr != nil {
|
||||
|
@ -418,11 +416,11 @@ func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, all
|
|||
}()
|
||||
if allocSize > 0 {
|
||||
if err = fileutil.Preallocate(f, allocSize, true); err != nil {
|
||||
return 0, nil, 0, errors.Wrap(err, "preallocate")
|
||||
return 0, nil, 0, fmt.Errorf("preallocate: %w", err)
|
||||
}
|
||||
}
|
||||
if err = dirFile.Sync(); err != nil {
|
||||
return 0, nil, 0, errors.Wrap(err, "sync directory")
|
||||
return 0, nil, 0, fmt.Errorf("sync directory: %w", err)
|
||||
}
|
||||
|
||||
// Write header metadata for new file.
|
||||
|
@ -432,24 +430,24 @@ func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, all
|
|||
|
||||
n, err := f.Write(metab)
|
||||
if err != nil {
|
||||
return 0, nil, 0, errors.Wrap(err, "write header")
|
||||
return 0, nil, 0, fmt.Errorf("write header: %w", err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
return 0, nil, 0, errors.Wrap(err, "close temp file")
|
||||
return 0, nil, 0, fmt.Errorf("close temp file: %w", err)
|
||||
}
|
||||
f = nil
|
||||
|
||||
if err := fileutil.Rename(ptmp, p); err != nil {
|
||||
return 0, nil, 0, errors.Wrap(err, "replace file")
|
||||
return 0, nil, 0, fmt.Errorf("replace file: %w", err)
|
||||
}
|
||||
|
||||
f, err = os.OpenFile(p, os.O_WRONLY, 0o666)
|
||||
if err != nil {
|
||||
return 0, nil, 0, errors.Wrap(err, "open final file")
|
||||
return 0, nil, 0, fmt.Errorf("open final file: %w", err)
|
||||
}
|
||||
// Skip header for further writes.
|
||||
if _, err := f.Seek(int64(n), 0); err != nil {
|
||||
return 0, nil, 0, errors.Wrap(err, "seek in final file")
|
||||
return 0, nil, 0, fmt.Errorf("seek in final file: %w", err)
|
||||
}
|
||||
return n, f, seq, nil
|
||||
}
|
||||
|
@ -606,16 +604,16 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
|
|||
cr := Reader{pool: pool, bs: bs, cs: cs}
|
||||
for i, b := range cr.bs {
|
||||
if b.Len() < SegmentHeaderSize {
|
||||
return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i)
|
||||
return nil, fmt.Errorf("invalid segment header in segment %d: %w", i, errInvalidSize)
|
||||
}
|
||||
// Verify magic number.
|
||||
if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks {
|
||||
return nil, errors.Errorf("invalid magic number %x", m)
|
||||
return nil, fmt.Errorf("invalid magic number %x", m)
|
||||
}
|
||||
|
||||
// Verify chunk format version.
|
||||
if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
|
||||
return nil, errors.Errorf("invalid chunk format version %d", v)
|
||||
return nil, fmt.Errorf("invalid chunk format version %d", v)
|
||||
}
|
||||
cr.size += int64(b.Len())
|
||||
}
|
||||
|
@ -641,7 +639,7 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
|
|||
f, err := fileutil.OpenMmapFile(fn)
|
||||
if err != nil {
|
||||
return nil, tsdb_errors.NewMulti(
|
||||
errors.Wrap(err, "mmap files"),
|
||||
fmt.Errorf("mmap files: %w", err),
|
||||
tsdb_errors.CloseAll(cs),
|
||||
).Err()
|
||||
}
|
||||
|
@ -673,20 +671,20 @@ func (s *Reader) Chunk(meta Meta) (chunkenc.Chunk, error) {
|
|||
sgmIndex, chkStart := BlockChunkRef(meta.Ref).Unpack()
|
||||
|
||||
if sgmIndex >= len(s.bs) {
|
||||
return nil, errors.Errorf("segment index %d out of range", sgmIndex)
|
||||
return nil, fmt.Errorf("segment index %d out of range", sgmIndex)
|
||||
}
|
||||
|
||||
sgmBytes := s.bs[sgmIndex]
|
||||
|
||||
if chkStart+MaxChunkLengthFieldSize > sgmBytes.Len() {
|
||||
return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len())
|
||||
return nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len())
|
||||
}
|
||||
// With the minimum chunk length this should never cause us reading
|
||||
// over the end of the slice.
|
||||
c := sgmBytes.Range(chkStart, chkStart+MaxChunkLengthFieldSize)
|
||||
chkDataLen, n := binary.Uvarint(c)
|
||||
if n <= 0 {
|
||||
return nil, errors.Errorf("reading chunk length failed with %d", n)
|
||||
return nil, fmt.Errorf("reading chunk length failed with %d", n)
|
||||
}
|
||||
|
||||
chkEncStart := chkStart + n
|
||||
|
@ -695,7 +693,7 @@ func (s *Reader) Chunk(meta Meta) (chunkenc.Chunk, error) {
|
|||
chkDataEnd := chkEnd - crc32.Size
|
||||
|
||||
if chkEnd > sgmBytes.Len() {
|
||||
return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len())
|
||||
return nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len())
|
||||
}
|
||||
|
||||
sum := sgmBytes.Range(chkDataEnd, chkEnd)
|
||||
|
|
|
@ -17,6 +17,8 @@ import (
|
|||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"os"
|
||||
|
@ -25,7 +27,6 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/dennwc/varint"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/exp/slices"
|
||||
|
@ -107,7 +108,7 @@ type CorruptionErr struct {
|
|||
}
|
||||
|
||||
func (e *CorruptionErr) Error() string {
|
||||
return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error()
|
||||
return fmt.Errorf("corruption in head chunk file %s: %w", segmentFile(e.Dir, e.FileIndex), e.Err).Error()
|
||||
}
|
||||
|
||||
// chunkPos keeps track of the position in the head chunk files.
|
||||
|
@ -240,10 +241,10 @@ type mmappedChunkFile struct {
|
|||
func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Pool, writeBufferSize, writeQueueSize int) (*ChunkDiskMapper, error) {
|
||||
// Validate write buffer size.
|
||||
if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
|
||||
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize)
|
||||
return nil, fmt.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize)
|
||||
}
|
||||
if writeBufferSize%1024 != 0 {
|
||||
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize)
|
||||
return nil, fmt.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(dir, 0o777); err != nil {
|
||||
|
@ -320,7 +321,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
|||
for seq, fn := range files {
|
||||
f, err := fileutil.OpenMmapFile(fn)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "mmap files, file: %s", fn)
|
||||
return fmt.Errorf("mmap files, file: %s: %w", fn, err)
|
||||
}
|
||||
cdm.closers[seq] = f
|
||||
cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())}
|
||||
|
@ -335,23 +336,23 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
|||
lastSeq := chkFileIndices[0]
|
||||
for _, seq := range chkFileIndices[1:] {
|
||||
if seq != lastSeq+1 {
|
||||
return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq)
|
||||
return fmt.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq)
|
||||
}
|
||||
lastSeq = seq
|
||||
}
|
||||
|
||||
for i, b := range cdm.mmappedChunkFiles {
|
||||
if b.byteSlice.Len() < HeadChunkFileHeaderSize {
|
||||
return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i])
|
||||
return fmt.Errorf("%s: invalid head chunk file header: %w", files[i], errInvalidSize)
|
||||
}
|
||||
// Verify magic number.
|
||||
if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks {
|
||||
return errors.Errorf("%s: invalid magic number %x", files[i], m)
|
||||
return fmt.Errorf("%s: invalid magic number %x", files[i], m)
|
||||
}
|
||||
|
||||
// Verify chunk format version.
|
||||
if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
|
||||
return errors.Errorf("%s: invalid chunk format version %d", files[i], v)
|
||||
return fmt.Errorf("%s: invalid chunk format version %d", files[i], v)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -394,16 +395,16 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
|
|||
|
||||
f, err := os.Open(files[lastFile])
|
||||
if err != nil {
|
||||
return files, errors.Wrap(err, "open file during last head chunk file repair")
|
||||
return files, fmt.Errorf("open file during last head chunk file repair: %w", err)
|
||||
}
|
||||
|
||||
buf := make([]byte, MagicChunksSize)
|
||||
size, err := f.Read(buf)
|
||||
if err != nil && err != io.EOF {
|
||||
return files, errors.Wrap(err, "failed to read magic number during last head chunk file repair")
|
||||
return files, fmt.Errorf("failed to read magic number during last head chunk file repair: %w", err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
return files, errors.Wrap(err, "close file during last head chunk file repair")
|
||||
return files, fmt.Errorf("close file during last head chunk file repair: %w", err)
|
||||
}
|
||||
|
||||
// We either don't have enough bytes for the magic number or the magic number is 0.
|
||||
|
@ -413,7 +414,7 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
|
|||
if size < MagicChunksSize || binary.BigEndian.Uint32(buf) == 0 {
|
||||
// Corrupt file, hence remove it.
|
||||
if err := os.RemoveAll(files[lastFile]); err != nil {
|
||||
return files, errors.Wrap(err, "delete corrupted, empty head chunk file during last file repair")
|
||||
return files, fmt.Errorf("delete corrupted, empty head chunk file during last file repair: %w", err)
|
||||
}
|
||||
delete(files, lastFile)
|
||||
}
|
||||
|
@ -559,7 +560,7 @@ func (cdm *ChunkDiskMapper) cutAndExpectRef(chkRef ChunkDiskMapperRef) (err erro
|
|||
}
|
||||
|
||||
if expSeq, expOffset := chkRef.Unpack(); seq != expSeq || offset != expOffset {
|
||||
return errors.Errorf("expected newly cut file to have sequence:offset %d:%d, got %d:%d", expSeq, expOffset, seq, offset)
|
||||
return fmt.Errorf("expected newly cut file to have sequence:offset %d:%d, got %d:%d", expSeq, expOffset, seq, offset)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -701,13 +702,13 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
|
|||
return nil, &CorruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: -1,
|
||||
Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex),
|
||||
Err: fmt.Errorf("head chunk file index %d more than current open file", sgmIndex),
|
||||
}
|
||||
}
|
||||
return nil, &CorruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: sgmIndex,
|
||||
Err: errors.Errorf("head chunk file index %d does not exist on disk", sgmIndex),
|
||||
Err: fmt.Errorf("head chunk file index %d does not exist on disk", sgmIndex),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -715,7 +716,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
|
|||
return nil, &CorruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: sgmIndex,
|
||||
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()),
|
||||
Err: fmt.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -734,7 +735,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
|
|||
return nil, &CorruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: sgmIndex,
|
||||
Err: errors.Errorf("reading chunk length failed with %d", n),
|
||||
Err: fmt.Errorf("reading chunk length failed with %d", n),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -744,7 +745,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
|
|||
return nil, &CorruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: sgmIndex,
|
||||
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()),
|
||||
Err: fmt.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -761,7 +762,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
|
|||
return nil, &CorruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: sgmIndex,
|
||||
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
|
||||
Err: fmt.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -829,7 +830,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
|
|||
return &CorruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: segID,
|
||||
Err: errors.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+
|
||||
Err: fmt.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+
|
||||
" - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID),
|
||||
}
|
||||
}
|
||||
|
@ -866,7 +867,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
|
|||
return &CorruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: segID,
|
||||
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID),
|
||||
Err: fmt.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -879,7 +880,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
|
|||
return &CorruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: segID,
|
||||
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
|
||||
Err: fmt.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
|
||||
}
|
||||
}
|
||||
idx += CRCSize
|
||||
|
@ -905,7 +906,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
|
|||
return &CorruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: segID,
|
||||
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID),
|
||||
Err: fmt.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -998,10 +999,9 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) ([]int, error) {
|
|||
// DeleteCorrupted deletes all the head chunk files after the one which had the corruption
|
||||
// (including the corrupt file).
|
||||
func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error {
|
||||
err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped.
|
||||
cerr, ok := err.(*CorruptionErr)
|
||||
if !ok {
|
||||
return errors.Wrap(originalErr, "cannot handle error")
|
||||
var cerr *CorruptionErr
|
||||
if !errors.As(originalErr, &cerr) {
|
||||
return fmt.Errorf("cannot handle error: %w", originalErr)
|
||||
}
|
||||
|
||||
// Delete all the head chunk files following the corrupt head chunk file.
|
||||
|
|
Loading…
Reference in a new issue