mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
MultiError: Refactored MultiError for more concise and safe usage. (#8066)
* MultiError: Refactored MultiError for more concise and safe usage. * Less lines * Goland IDE was marking every usage of old MultiError "potential nil" error * It was easy to forgot using Err() when error was returned, now it's safely assured on compile time. NOTE: Potentially I would rename package to merrors. (: In different PR. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed review comments. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fix after rebase. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
parent
28ca0965f0
commit
3d8826a3d4
|
@ -41,8 +41,6 @@ import (
|
||||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
|
||||||
|
|
||||||
const timeDelta = 30000
|
const timeDelta = 30000
|
||||||
|
|
||||||
type writeBenchmark struct {
|
type writeBenchmark struct {
|
||||||
|
@ -349,9 +347,7 @@ func listBlocks(path string, humanReadable bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
merr.Add(err)
|
err = tsdb_errors.NewMulti(err, db.Close()).Err()
|
||||||
merr.Add(db.Close())
|
|
||||||
err = merr.Err()
|
|
||||||
}()
|
}()
|
||||||
blocks, err := db.Blocks()
|
blocks, err := db.Blocks()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -429,9 +425,7 @@ func analyzeBlock(path, blockID string, limit int) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
merr.Add(err)
|
err = tsdb_errors.NewMulti(err, db.Close()).Err()
|
||||||
merr.Add(db.Close())
|
|
||||||
err = merr.Err()
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
meta := block.Meta()
|
meta := block.Meta()
|
||||||
|
@ -579,9 +573,7 @@ func dumpSamples(path string, mint, maxt int64) (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
merr.Add(err)
|
err = tsdb_errors.NewMulti(err, db.Close()).Err()
|
||||||
merr.Add(db.Close())
|
|
||||||
err = merr.Err()
|
|
||||||
}()
|
}()
|
||||||
q, err := db.Querier(context.TODO(), mint, maxt)
|
q, err := db.Querier(context.TODO(), mint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -605,11 +597,7 @@ func dumpSamples(path string, mint, maxt int64) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ws := ss.Warnings(); len(ws) > 0 {
|
if ws := ss.Warnings(); len(ws) > 0 {
|
||||||
var merr tsdb_errors.MultiError
|
return tsdb_errors.NewMulti(ws...).Err()
|
||||||
for _, w := range ws {
|
|
||||||
merr.Add(w)
|
|
||||||
}
|
|
||||||
return merr.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ss.Err() != nil {
|
if ss.Err() != nil {
|
||||||
|
|
|
@ -80,8 +80,7 @@ func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error)
|
||||||
querier, err := storage.Querier(ctx, mint, maxt)
|
querier, err := storage.Querier(ctx, mint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Close already open Queriers, append potential errors to returned error.
|
// Close already open Queriers, append potential errors to returned error.
|
||||||
errs := tsdb_errors.MultiError{err}
|
errs := tsdb_errors.NewMulti(err, primary.Close())
|
||||||
errs.Add(primary.Close())
|
|
||||||
for _, q := range secondaries {
|
for _, q := range secondaries {
|
||||||
errs.Add(q.Close())
|
errs.Add(q.Close())
|
||||||
}
|
}
|
||||||
|
@ -103,8 +102,7 @@ func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQueri
|
||||||
querier, err := storage.ChunkQuerier(ctx, mint, maxt)
|
querier, err := storage.ChunkQuerier(ctx, mint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Close already open Queriers, append potential errors to returned error.
|
// Close already open Queriers, append potential errors to returned error.
|
||||||
errs := tsdb_errors.MultiError{err}
|
errs := tsdb_errors.NewMulti(err, primary.Close())
|
||||||
errs.Add(primary.Close())
|
|
||||||
for _, q := range secondaries {
|
for _, q := range secondaries {
|
||||||
errs.Add(q.Close())
|
errs.Add(q.Close())
|
||||||
}
|
}
|
||||||
|
@ -130,8 +128,7 @@ func (f *fanout) Appender(ctx context.Context) Appender {
|
||||||
|
|
||||||
// Close closes the storage and all its underlying resources.
|
// Close closes the storage and all its underlying resources.
|
||||||
func (f *fanout) Close() error {
|
func (f *fanout) Close() error {
|
||||||
errs := tsdb_errors.MultiError{}
|
errs := tsdb_errors.NewMulti(f.primary.Close())
|
||||||
errs.Add(f.primary.Close())
|
|
||||||
for _, s := range f.secondaries {
|
for _, s := range f.secondaries {
|
||||||
errs.Add(s.Close())
|
errs.Add(s.Close())
|
||||||
}
|
}
|
||||||
|
|
|
@ -248,7 +248,7 @@ func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
|
||||||
|
|
||||||
// Close releases the resources of the generic querier.
|
// Close releases the resources of the generic querier.
|
||||||
func (q *mergeGenericQuerier) Close() error {
|
func (q *mergeGenericQuerier) Close() error {
|
||||||
errs := tsdb_errors.MultiError{}
|
errs := tsdb_errors.NewMulti()
|
||||||
for _, querier := range q.queriers {
|
for _, querier := range q.queriers {
|
||||||
if err := querier.Close(); err != nil {
|
if err := querier.Close(); err != nil {
|
||||||
errs.Add(err)
|
errs.Add(err)
|
||||||
|
@ -534,11 +534,9 @@ func (c *chainSampleIterator) Next() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *chainSampleIterator) Err() error {
|
func (c *chainSampleIterator) Err() error {
|
||||||
var errs tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti()
|
||||||
for _, iter := range c.iterators {
|
for _, iter := range c.iterators {
|
||||||
if err := iter.Err(); err != nil {
|
errs.Add(iter.Err())
|
||||||
errs.Add(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return errs.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
@ -681,11 +679,9 @@ func (c *compactChunkIterator) Next() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactChunkIterator) Err() error {
|
func (c *compactChunkIterator) Err() error {
|
||||||
var errs tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti()
|
||||||
for _, iter := range c.iterators {
|
for _, iter := range c.iterators {
|
||||||
if err := iter.Err(); err != nil {
|
errs.Add(iter.Err())
|
||||||
errs.Add(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
errs.Add(c.err)
|
errs.Add(c.err)
|
||||||
return errs.Err()
|
return errs.Err()
|
||||||
|
|
|
@ -226,19 +226,14 @@ func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) (int64, error
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
|
||||||
n, err := f.Write(jsonMeta)
|
n, err := f.Write(jsonMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
merr.Add(err)
|
return 0, tsdb_errors.NewMulti(err, f.Close()).Err()
|
||||||
merr.Add(f.Close())
|
|
||||||
return 0, merr.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Force the kernel to persist the file on disk to avoid data loss if the host crashes.
|
// Force the kernel to persist the file on disk to avoid data loss if the host crashes.
|
||||||
if err := f.Sync(); err != nil {
|
if err := f.Sync(); err != nil {
|
||||||
merr.Add(err)
|
return 0, tsdb_errors.NewMulti(err, f.Close()).Err()
|
||||||
merr.Add(f.Close())
|
|
||||||
return 0, merr.Err()
|
|
||||||
}
|
}
|
||||||
if err := f.Close(); err != nil {
|
if err := f.Close(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
@ -280,10 +275,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
|
||||||
var closers []io.Closer
|
var closers []io.Closer
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var merr tsdb_errors.MultiError
|
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
|
||||||
merr.Add(err)
|
|
||||||
merr.Add(closeAll(closers))
|
|
||||||
err = merr.Err()
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
meta, sizeMeta, err := readMetaFile(dir)
|
meta, sizeMeta, err := readMetaFile(dir)
|
||||||
|
@ -333,13 +325,11 @@ func (pb *Block) Close() error {
|
||||||
|
|
||||||
pb.pendingReaders.Wait()
|
pb.pendingReaders.Wait()
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
return tsdb_errors.NewMulti(
|
||||||
|
pb.chunkr.Close(),
|
||||||
merr.Add(pb.chunkr.Close())
|
pb.indexr.Close(),
|
||||||
merr.Add(pb.indexr.Close())
|
pb.tombstones.Close(),
|
||||||
merr.Add(pb.tombstones.Close())
|
).Err()
|
||||||
|
|
||||||
return merr.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *Block) String() string {
|
func (pb *Block) String() string {
|
||||||
|
|
|
@ -230,14 +230,13 @@ func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, all
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if returnErr != nil {
|
if returnErr != nil {
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti(returnErr)
|
||||||
merr.Add(returnErr)
|
|
||||||
if f != nil {
|
if f != nil {
|
||||||
merr.Add(f.Close())
|
errs.Add(f.Close())
|
||||||
}
|
}
|
||||||
// Calling RemoveAll on a non-existent file does not return error.
|
// Calling RemoveAll on a non-existent file does not return error.
|
||||||
merr.Add(os.RemoveAll(ptmp))
|
errs.Add(os.RemoveAll(ptmp))
|
||||||
returnErr = merr.Err()
|
returnErr = errs.Err()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if allocSize > 0 {
|
if allocSize > 0 {
|
||||||
|
@ -463,16 +462,16 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
bs []ByteSlice
|
bs []ByteSlice
|
||||||
cs []io.Closer
|
cs []io.Closer
|
||||||
merr tsdb_errors.MultiError
|
|
||||||
)
|
)
|
||||||
for _, fn := range files {
|
for _, fn := range files {
|
||||||
f, err := fileutil.OpenMmapFile(fn)
|
f, err := fileutil.OpenMmapFile(fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
merr.Add(errors.Wrap(err, "mmap files"))
|
return nil, tsdb_errors.NewMulti(
|
||||||
merr.Add(closeAll(cs))
|
errors.Wrap(err, "mmap files"),
|
||||||
return nil, merr
|
tsdb_errors.CloseAll(cs),
|
||||||
|
).Err()
|
||||||
}
|
}
|
||||||
cs = append(cs, f)
|
cs = append(cs, f)
|
||||||
bs = append(bs, realByteSlice(f.Bytes()))
|
bs = append(bs, realByteSlice(f.Bytes()))
|
||||||
|
@ -480,15 +479,16 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
|
||||||
|
|
||||||
reader, err := newReader(bs, cs, pool)
|
reader, err := newReader(bs, cs, pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
merr.Add(err)
|
return nil, tsdb_errors.NewMulti(
|
||||||
merr.Add(closeAll(cs))
|
err,
|
||||||
return nil, merr
|
tsdb_errors.CloseAll(cs),
|
||||||
|
).Err()
|
||||||
}
|
}
|
||||||
return reader, nil
|
return reader, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Reader) Close() error {
|
func (s *Reader) Close() error {
|
||||||
return closeAll(s.cs)
|
return tsdb_errors.CloseAll(s.cs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of the chunks.
|
// Size returns the size of the chunks.
|
||||||
|
@ -588,12 +588,3 @@ func sequenceFiles(dir string) ([]string, error) {
|
||||||
}
|
}
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func closeAll(cs []io.Closer) error {
|
|
||||||
var merr tsdb_errors.MultiError
|
|
||||||
|
|
||||||
for _, c := range cs {
|
|
||||||
merr.Add(c.Close())
|
|
||||||
}
|
|
||||||
return merr.Err()
|
|
||||||
}
|
|
||||||
|
|
|
@ -153,10 +153,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
||||||
cdm.closers = map[int]io.Closer{}
|
cdm.closers = map[int]io.Closer{}
|
||||||
defer func() {
|
defer func() {
|
||||||
if returnErr != nil {
|
if returnErr != nil {
|
||||||
var merr tsdb_errors.MultiError
|
returnErr = tsdb_errors.NewMulti(returnErr, closeAllFromMap(cdm.closers)).Err()
|
||||||
merr.Add(returnErr)
|
|
||||||
merr.Add(closeAllFromMap(cdm.closers))
|
|
||||||
returnErr = merr.Err()
|
|
||||||
|
|
||||||
cdm.mmappedChunkFiles = nil
|
cdm.mmappedChunkFiles = nil
|
||||||
cdm.closers = nil
|
cdm.closers = nil
|
||||||
|
@ -370,10 +367,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) {
|
||||||
// The file should not be closed if there is no error,
|
// The file should not be closed if there is no error,
|
||||||
// its kept open in the ChunkDiskMapper.
|
// its kept open in the ChunkDiskMapper.
|
||||||
if returnErr != nil {
|
if returnErr != nil {
|
||||||
var merr tsdb_errors.MultiError
|
returnErr = tsdb_errors.NewMulti(returnErr, newFile.Close()).Err()
|
||||||
merr.Add(returnErr)
|
|
||||||
merr.Add(newFile.Close())
|
|
||||||
returnErr = merr.Err()
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -717,13 +711,13 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error {
|
||||||
}
|
}
|
||||||
cdm.readPathMtx.RUnlock()
|
cdm.readPathMtx.RUnlock()
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti()
|
||||||
// Cut a new file only if the current file has some chunks.
|
// Cut a new file only if the current file has some chunks.
|
||||||
if cdm.curFileSize() > HeadChunkFileHeaderSize {
|
if cdm.curFileSize() > HeadChunkFileHeaderSize {
|
||||||
merr.Add(cdm.CutNewFile())
|
errs.Add(cdm.CutNewFile())
|
||||||
}
|
}
|
||||||
merr.Add(cdm.deleteFiles(removedFiles))
|
errs.Add(cdm.deleteFiles(removedFiles))
|
||||||
return merr.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error {
|
func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error {
|
||||||
|
@ -795,23 +789,23 @@ func (cdm *ChunkDiskMapper) Close() error {
|
||||||
}
|
}
|
||||||
cdm.closed = true
|
cdm.closed = true
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti(
|
||||||
merr.Add(closeAllFromMap(cdm.closers))
|
closeAllFromMap(cdm.closers),
|
||||||
merr.Add(cdm.finalizeCurFile())
|
cdm.finalizeCurFile(),
|
||||||
merr.Add(cdm.dir.Close())
|
cdm.dir.Close(),
|
||||||
|
)
|
||||||
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
|
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
|
||||||
cdm.closers = map[int]io.Closer{}
|
cdm.closers = map[int]io.Closer{}
|
||||||
|
|
||||||
return merr.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func closeAllFromMap(cs map[int]io.Closer) error {
|
func closeAllFromMap(cs map[int]io.Closer) error {
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti()
|
||||||
for _, c := range cs {
|
for _, c := range cs {
|
||||||
merr.Add(c.Close())
|
errs.Add(c.Close())
|
||||||
}
|
}
|
||||||
return merr.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
const inBufferShards = 128 // 128 is a randomly chosen number.
|
const inBufferShards = 128 // 128 is a randomly chosen number.
|
||||||
|
|
|
@ -451,17 +451,16 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
|
||||||
return uid, nil
|
return uid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti(err)
|
||||||
merr.Add(err)
|
|
||||||
if err != context.Canceled {
|
if err != context.Canceled {
|
||||||
for _, b := range bs {
|
for _, b := range bs {
|
||||||
if err := b.setCompactionFailed(); err != nil {
|
if err := b.setCompactionFailed(); err != nil {
|
||||||
merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
|
errs.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return uid, merr
|
return uid, errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
|
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
|
||||||
|
@ -534,10 +533,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
tmp := dir + tmpForCreationBlockDirSuffix
|
tmp := dir + tmpForCreationBlockDirSuffix
|
||||||
var closers []io.Closer
|
var closers []io.Closer
|
||||||
defer func(t time.Time) {
|
defer func(t time.Time) {
|
||||||
var merr tsdb_errors.MultiError
|
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
|
||||||
merr.Add(err)
|
|
||||||
merr.Add(closeAll(closers))
|
|
||||||
err = merr.Err()
|
|
||||||
|
|
||||||
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
|
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
|
||||||
if err := os.RemoveAll(tmp); err != nil {
|
if err := os.RemoveAll(tmp); err != nil {
|
||||||
|
@ -594,13 +590,13 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
// though these are covered under defer. This is because in Windows,
|
// though these are covered under defer. This is because in Windows,
|
||||||
// you cannot delete these unless they are closed and the defer is to
|
// you cannot delete these unless they are closed and the defer is to
|
||||||
// make sure they are closed if the function exits due to an error above.
|
// make sure they are closed if the function exits due to an error above.
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti()
|
||||||
for _, w := range closers {
|
for _, w := range closers {
|
||||||
merr.Add(w.Close())
|
errs.Add(w.Close())
|
||||||
}
|
}
|
||||||
closers = closers[:0] // Avoid closing the writers twice in the defer.
|
closers = closers[:0] // Avoid closing the writers twice in the defer.
|
||||||
if merr.Err() != nil {
|
if errs.Err() != nil {
|
||||||
return merr.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Populated block is empty, so exit early.
|
// Populated block is empty, so exit early.
|
||||||
|
@ -660,12 +656,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
overlapping bool
|
overlapping bool
|
||||||
)
|
)
|
||||||
defer func() {
|
defer func() {
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti(err)
|
||||||
merr.Add(err)
|
if cerr := tsdb_errors.CloseAll(closers); cerr != nil {
|
||||||
if cerr := closeAll(closers); cerr != nil {
|
errs.Add(errors.Wrap(cerr, "close"))
|
||||||
merr.Add(errors.Wrap(cerr, "close"))
|
|
||||||
}
|
}
|
||||||
err = merr.Err()
|
err = errs.Err()
|
||||||
c.metrics.populatingBlocks.Set(0)
|
c.metrics.populatingBlocks.Set(0)
|
||||||
}()
|
}()
|
||||||
c.metrics.populatingBlocks.Set(1)
|
c.metrics.populatingBlocks.Set(1)
|
||||||
|
|
72
tsdb/db.go
72
tsdb/db.go
|
@ -333,10 +333,10 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
var merr tsdb_errors.MultiError
|
returnErr = tsdb_errors.NewMulti(
|
||||||
merr.Add(returnErr)
|
returnErr,
|
||||||
merr.Add(errors.Wrap(head.Close(), "closing Head"))
|
errors.Wrap(head.Close(), "closing Head"),
|
||||||
returnErr = merr.Err()
|
).Err()
|
||||||
}()
|
}()
|
||||||
// Set the min valid time for the ingested wal samples
|
// Set the min valid time for the ingested wal samples
|
||||||
// to be no lower than the maxt of the last block.
|
// to be no lower than the maxt of the last block.
|
||||||
|
@ -466,11 +466,11 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
|
||||||
level.Warn(db.logger).Log("msg", "Closing block failed", "err", err, "block", b)
|
level.Warn(db.logger).Log("msg", "Closing block failed", "err", err, "block", b)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti()
|
||||||
for ulid, err := range corrupted {
|
for ulid, err := range corrupted {
|
||||||
merr.Add(errors.Wrapf(err, "corrupted block %s", ulid.String()))
|
errs.Add(errors.Wrapf(err, "corrupted block %s", ulid.String()))
|
||||||
}
|
}
|
||||||
return nil, merr.Err()
|
return nil, errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(loadable) == 0 {
|
if len(loadable) == 0 {
|
||||||
|
@ -514,12 +514,7 @@ func (db *DBReadOnly) Close() error {
|
||||||
}
|
}
|
||||||
close(db.closed)
|
close(db.closed)
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
return tsdb_errors.CloseAll(db.closers)
|
||||||
|
|
||||||
for _, b := range db.closers {
|
|
||||||
merr.Add(b.Close())
|
|
||||||
}
|
|
||||||
return merr.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.
|
// Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.
|
||||||
|
@ -602,11 +597,10 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
|
|
||||||
close(db.donec) // DB is never run if it was an error, so close this channel here.
|
close(db.donec) // DB is never run if it was an error, so close this channel here.
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
returnedErr = tsdb_errors.NewMulti(
|
||||||
merr.Add(returnedErr)
|
returnedErr,
|
||||||
merr.Add(errors.Wrap(db.Close(), "close DB after failed startup"))
|
errors.Wrap(db.Close(), "close DB after failed startup"),
|
||||||
|
).Err()
|
||||||
returnedErr = merr.Err()
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if db.blocksToDelete == nil {
|
if db.blocksToDelete == nil {
|
||||||
|
@ -798,10 +792,10 @@ func (db *DB) Compact() (returnErr error) {
|
||||||
|
|
||||||
lastBlockMaxt := int64(math.MinInt64)
|
lastBlockMaxt := int64(math.MinInt64)
|
||||||
defer func() {
|
defer func() {
|
||||||
var merr tsdb_errors.MultiError
|
returnErr = tsdb_errors.NewMulti(
|
||||||
merr.Add(returnErr)
|
returnErr,
|
||||||
merr.Add(errors.Wrap(db.head.truncateWAL(lastBlockMaxt), "WAL truncation in Compact defer"))
|
errors.Wrap(db.head.truncateWAL(lastBlockMaxt), "WAL truncation in Compact defer"),
|
||||||
returnErr = merr.Err()
|
).Err()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Check whether we have pending head blocks that are ready to be persisted.
|
// Check whether we have pending head blocks that are ready to be persisted.
|
||||||
|
@ -867,10 +861,10 @@ func (db *DB) compactHead(head *RangeHead) error {
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
if err := db.reloadBlocks(); err != nil {
|
if err := db.reloadBlocks(); err != nil {
|
||||||
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
|
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
|
||||||
var merr tsdb_errors.MultiError
|
return tsdb_errors.NewMulti(
|
||||||
merr.Add(errors.Wrap(err, "reloadBlocks blocks"))
|
errors.Wrap(err, "reloadBlocks blocks"),
|
||||||
merr.Add(errors.Wrapf(errRemoveAll, "delete persisted head block after failed db reloadBlocks:%s", uid))
|
errors.Wrapf(errRemoveAll, "delete persisted head block after failed db reloadBlocks:%s", uid),
|
||||||
return merr.Err()
|
).Err()
|
||||||
}
|
}
|
||||||
return errors.Wrap(err, "reloadBlocks blocks")
|
return errors.Wrap(err, "reloadBlocks blocks")
|
||||||
}
|
}
|
||||||
|
@ -984,11 +978,11 @@ func (db *DB) reloadBlocks() (err error) {
|
||||||
block.Close()
|
block.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti()
|
||||||
for ulid, err := range corrupted {
|
for ulid, err := range corrupted {
|
||||||
merr.Add(errors.Wrapf(err, "corrupted block %s", ulid.String()))
|
errs.Add(errors.Wrapf(err, "corrupted block %s", ulid.String()))
|
||||||
}
|
}
|
||||||
return merr.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -1343,17 +1337,14 @@ func (db *DB) Close() error {
|
||||||
g.Go(pb.Close)
|
g.Go(pb.Close)
|
||||||
}
|
}
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti(g.Wait())
|
||||||
|
|
||||||
merr.Add(g.Wait())
|
|
||||||
|
|
||||||
if db.lockf != nil {
|
if db.lockf != nil {
|
||||||
merr.Add(db.lockf.Release())
|
errs.Add(db.lockf.Release())
|
||||||
}
|
}
|
||||||
if db.head != nil {
|
if db.head != nil {
|
||||||
merr.Add(db.head.Close())
|
errs.Add(db.head.Close())
|
||||||
}
|
}
|
||||||
return merr.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// DisableCompactions disables auto compactions.
|
// DisableCompactions disables auto compactions.
|
||||||
|
@ -1618,15 +1609,6 @@ func nextSequenceFile(dir string) (string, int, error) {
|
||||||
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
|
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func closeAll(cs []io.Closer) error {
|
|
||||||
var merr tsdb_errors.MultiError
|
|
||||||
|
|
||||||
for _, c := range cs {
|
|
||||||
merr.Add(c.Close())
|
|
||||||
}
|
|
||||||
return merr.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func exponential(d, min, max time.Duration) time.Duration {
|
func exponential(d, min, max time.Duration) time.Duration {
|
||||||
d *= 2
|
d *= 2
|
||||||
if d < min {
|
if d < min {
|
||||||
|
|
|
@ -17,21 +17,59 @@ package errors
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The MultiError type implements the error interface, and contains the
|
// multiError type allows combining multiple errors into one.
|
||||||
// Errors used to construct it.
|
type multiError []error
|
||||||
type MultiError []error
|
|
||||||
|
|
||||||
// Returns a concatenated string of the contained errors
|
// NewMulti returns multiError with provided errors added if not nil.
|
||||||
func (es MultiError) Error() string {
|
func NewMulti(errs ...error) multiError { // nolint:golint
|
||||||
|
m := multiError{}
|
||||||
|
m.Add(errs...)
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add adds single or many errors to the error list. Each error is added only if not nil.
|
||||||
|
// If the error is a nonNilMultiError type, the errors inside nonNilMultiError are added to the main multiError.
|
||||||
|
func (es *multiError) Add(errs ...error) {
|
||||||
|
for _, err := range errs {
|
||||||
|
if err == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if merr, ok := err.(nonNilMultiError); ok {
|
||||||
|
*es = append(*es, merr.errs...)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
*es = append(*es, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Err returns the error list as an error or nil if it is empty.
|
||||||
|
func (es multiError) Err() error {
|
||||||
|
if len(es) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return nonNilMultiError{errs: es}
|
||||||
|
}
|
||||||
|
|
||||||
|
// nonNilMultiError implements the error interface, and it represents
|
||||||
|
// multiError with at least one error inside it.
|
||||||
|
// This type is needed to make sure that nil is returned when no error is combined in multiError for err != nil
|
||||||
|
// check to work.
|
||||||
|
type nonNilMultiError struct {
|
||||||
|
errs multiError
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns a concatenated string of the contained errors.
|
||||||
|
func (es nonNilMultiError) Error() string {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
|
|
||||||
if len(es) > 1 {
|
if len(es.errs) > 1 {
|
||||||
fmt.Fprintf(&buf, "%d errors: ", len(es))
|
fmt.Fprintf(&buf, "%d errors: ", len(es.errs))
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, err := range es {
|
for i, err := range es.errs {
|
||||||
if i != 0 {
|
if i != 0 {
|
||||||
buf.WriteString("; ")
|
buf.WriteString("; ")
|
||||||
}
|
}
|
||||||
|
@ -41,22 +79,11 @@ func (es MultiError) Error() string {
|
||||||
return buf.String()
|
return buf.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add adds the error to the error list if it is not nil.
|
// CloseAll closes all given closers while recording error in MultiError.
|
||||||
func (es *MultiError) Add(err error) {
|
func CloseAll(cs []io.Closer) error {
|
||||||
if err == nil {
|
errs := NewMulti()
|
||||||
return
|
for _, c := range cs {
|
||||||
}
|
errs.Add(c.Close())
|
||||||
if merr, ok := err.(MultiError); ok {
|
|
||||||
*es = append(*es, merr...)
|
|
||||||
} else {
|
|
||||||
*es = append(*es, err)
|
|
||||||
}
|
}
|
||||||
}
|
return errs.Err()
|
||||||
|
|
||||||
// Err returns the error list as an error or nil if it is empty.
|
|
||||||
func (es MultiError) Err() error {
|
|
||||||
if len(es) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return es
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1465,12 +1465,11 @@ func (h *Head) Close() error {
|
||||||
h.closedMtx.Lock()
|
h.closedMtx.Lock()
|
||||||
defer h.closedMtx.Unlock()
|
defer h.closedMtx.Unlock()
|
||||||
h.closed = true
|
h.closed = true
|
||||||
var merr tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close())
|
||||||
merr.Add(h.chunkDiskMapper.Close())
|
|
||||||
if h.wal != nil {
|
if h.wal != nil {
|
||||||
merr.Add(h.wal.Close())
|
errs.Add(h.wal.Close())
|
||||||
}
|
}
|
||||||
return merr.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
type headChunkReader struct {
|
type headChunkReader struct {
|
||||||
|
|
|
@ -1075,10 +1075,10 @@ func NewFileReader(path string) (*Reader, error) {
|
||||||
}
|
}
|
||||||
r, err := newReader(realByteSlice(f.Bytes()), f)
|
r, err := newReader(realByteSlice(f.Bytes()), f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var merr tsdb_errors.MultiError
|
return nil, tsdb_errors.NewMulti(
|
||||||
merr.Add(err)
|
err,
|
||||||
merr.Add(f.Close())
|
f.Close(),
|
||||||
return nil, merr
|
).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
|
|
|
@ -97,12 +97,14 @@ func (q *blockBaseQuerier) Close() error {
|
||||||
if q.closed {
|
if q.closed {
|
||||||
return errors.New("block querier already closed")
|
return errors.New("block querier already closed")
|
||||||
}
|
}
|
||||||
var merr tsdb_errors.MultiError
|
|
||||||
merr.Add(q.index.Close())
|
errs := tsdb_errors.NewMulti(
|
||||||
merr.Add(q.chunks.Close())
|
q.index.Close(),
|
||||||
merr.Add(q.tombstones.Close())
|
q.chunks.Close(),
|
||||||
|
q.tombstones.Close(),
|
||||||
|
)
|
||||||
q.closed = true
|
q.closed = true
|
||||||
return merr.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
type blockQuerier struct {
|
type blockQuerier struct {
|
||||||
|
|
|
@ -83,18 +83,18 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
|
||||||
return errors.Wrapf(err, "copy content of index to index.repaired for block dir: %v", d)
|
return errors.Wrapf(err, "copy content of index to index.repaired for block dir: %v", d)
|
||||||
}
|
}
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
|
||||||
|
|
||||||
// Set the 5th byte to 2 to indicate the correct file format version.
|
// Set the 5th byte to 2 to indicate the correct file format version.
|
||||||
if _, err := repl.WriteAt([]byte{2}, 4); err != nil {
|
if _, err := repl.WriteAt([]byte{2}, 4); err != nil {
|
||||||
merr.Add(errors.Wrap(err, "rewrite of index.repaired"))
|
return tsdb_errors.NewMulti(
|
||||||
merr.Add(errors.Wrap(repl.Close(), "close"))
|
errors.Wrapf(err, "rewrite of index.repaired for block dir: %v", d),
|
||||||
return errors.Wrapf(merr.Err(), "block dir: %v", d)
|
errors.Wrap(repl.Close(), "close"),
|
||||||
|
).Err()
|
||||||
}
|
}
|
||||||
if err := repl.Sync(); err != nil {
|
if err := repl.Sync(); err != nil {
|
||||||
merr.Add(errors.Wrap(err, "sync of index.repaired"))
|
return tsdb_errors.NewMulti(
|
||||||
merr.Add(errors.Wrap(repl.Close(), "close"))
|
errors.Wrapf(err, "sync of index.repaired for block dir: %v", d),
|
||||||
return errors.Wrapf(merr.Err(), "block dir: %v", d)
|
errors.Wrap(repl.Close(), "close"),
|
||||||
|
).Err()
|
||||||
}
|
}
|
||||||
if err := repl.Close(); err != nil {
|
if err := repl.Close(); err != nil {
|
||||||
return errors.Wrapf(repl.Close(), "close repaired index for block dir: %v", d)
|
return errors.Wrapf(repl.Close(), "close repaired index for block dir: %v", d)
|
||||||
|
|
|
@ -126,10 +126,8 @@ func WriteFile(logger log.Logger, dir string, tr Reader) (int64, error) {
|
||||||
}
|
}
|
||||||
size += n
|
size += n
|
||||||
|
|
||||||
var merr tsdb_errors.MultiError
|
if err := f.Sync(); err != nil {
|
||||||
if merr.Add(f.Sync()); merr.Err() != nil {
|
return 0, tsdb_errors.NewMulti(err, f.Close()).Err()
|
||||||
merr.Add(f.Close())
|
|
||||||
return 0, merr.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = f.Close(); err != nil {
|
if err = f.Close(); err != nil {
|
||||||
|
|
|
@ -68,14 +68,12 @@ func DeleteCheckpoints(dir string, maxIndex int) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var errs tsdb_errors.MultiError
|
errs := tsdb_errors.NewMulti()
|
||||||
for _, checkpoint := range checkpoints {
|
for _, checkpoint := range checkpoints {
|
||||||
if checkpoint.index >= maxIndex {
|
if checkpoint.index >= maxIndex {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err := os.RemoveAll(filepath.Join(dir, checkpoint.name)); err != nil {
|
errs.Add(os.RemoveAll(filepath.Join(dir, checkpoint.name)))
|
||||||
errs.Add(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return errs.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
|
@ -280,11 +280,7 @@ func (m *multiReadCloser) Read(p []byte) (n int, err error) {
|
||||||
return m.reader.Read(p)
|
return m.reader.Read(p)
|
||||||
}
|
}
|
||||||
func (m *multiReadCloser) Close() error {
|
func (m *multiReadCloser) Close() error {
|
||||||
var merr tsdb_errors.MultiError
|
return tsdb_errors.NewMulti(tsdb_errors.CloseAll(m.closers)).Err()
|
||||||
for _, closer := range m.closers {
|
|
||||||
merr.Add(closer.Close())
|
|
||||||
}
|
|
||||||
return merr.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func allSegments(dir string) (io.ReadCloser, error) {
|
func allSegments(dir string) (io.ReadCloser, error) {
|
||||||
|
|
Loading…
Reference in a new issue