tsdb: improve logs when encountering corruption (#7308)

* tsdb: improve logs when encountering corruption

Signed-off-by: Simon Pasquier <spasquie@redhat.com>

* Wrap corrupted block errors

Signed-off-by: Simon Pasquier <spasquie@redhat.com>

* Add file path to head chunks

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2020-06-17 16:40:00 +02:00 committed by GitHub
parent 7b24bb3116
commit 2f12049371
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 8 deletions

View file

@ -190,23 +190,23 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
lastSeq := chkFileIndices[0]
for _, seq := range chkFileIndices[1:] {
if seq != lastSeq+1 {
return errors.Errorf("found unsequential head chunk files %d and %d", lastSeq, seq)
return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq)
}
lastSeq = seq
}
for i, b := range cdm.mmappedChunkFiles {
if b.byteSlice.Len() < HeadChunkFileHeaderSize {
return errors.Wrapf(errInvalidSize, "invalid head chunk file header in file %d", i)
return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i])
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks {
return errors.Errorf("invalid magic number %x", m)
return errors.Errorf("%s: invalid magic number %x", files[i], m)
}
// Verify chunk format version.
if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
return errors.Errorf("invalid chunk format version %d", v)
return errors.Errorf("%s: invalid chunk format version %d", files[i], v)
}
cdm.size += int64(b.byteSlice.Len())

View file

@ -441,10 +441,14 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
if len(corrupted) > 0 {
for _, b := range loadable {
if err := b.Close(); err != nil {
level.Warn(db.logger).Log("msg", "Closing a block", err)
level.Warn(db.logger).Log("msg", "Closing block failed", "err", err, "block", b)
}
}
return nil, errors.Errorf("unexpected corrupted block:%v", corrupted)
var merr tsdb_errors.MultiError
for ulid, err := range corrupted {
merr.Add(errors.Wrapf(err, "corrupted block %s", ulid.String()))
}
return nil, merr.Err()
}
if len(loadable) == 0 {
@ -880,7 +884,11 @@ func (db *DB) reload() (err error) {
block.Close()
}
}
return fmt.Errorf("unexpected corrupted block:%v", corrupted)
var merr tsdb_errors.MultiError
for ulid, err := range corrupted {
merr.Add(errors.Wrapf(err, "corrupted block %s", ulid.String()))
}
return merr.Err()
}
// All deletable blocks should not be loaded.
@ -1054,7 +1062,7 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
for ulid, block := range blocks {
if block != nil {
if err := block.Close(); err != nil {
level.Warn(db.logger).Log("msg", "Closing block failed", "err", err)
level.Warn(db.logger).Log("msg", "Closing block failed", "err", err, "block", ulid)
}
}
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {