mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
Read repair empty last file in chunks_head (#8061)
* Read repair empty file in chunks_head Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor and introduce repairLastChunkFile Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Attempt windows test fix Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
parent
3b08b2dabe
commit
2624d827fa
|
@ -167,6 +167,11 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
|||
return err
|
||||
}
|
||||
|
||||
files, err = repairLastChunkFile(files)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
chkFileIndices := make([]int, 0, len(files))
|
||||
for seq, fn := range files {
|
||||
f, err := fileutil.OpenMmapFile(fn)
|
||||
|
@ -226,9 +231,40 @@ func listChunkFiles(dir string) (map[int]string, error) {
|
|||
}
|
||||
res[int(seq)] = filepath.Join(dir, fi.Name())
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// repairLastChunkFile deletes the last file if it's empty.
|
||||
// Because we don't fsync when creating these file, we could end
|
||||
// up with an empty file at the end during an abrupt shutdown.
|
||||
func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr error) {
|
||||
lastFile := -1
|
||||
for seq := range files {
|
||||
if seq > lastFile {
|
||||
lastFile = seq
|
||||
}
|
||||
}
|
||||
|
||||
if lastFile <= 0 {
|
||||
return files, nil
|
||||
}
|
||||
|
||||
info, err := os.Stat(files[lastFile])
|
||||
if err != nil {
|
||||
return files, errors.Wrap(err, "file stat during last head chunk file repair")
|
||||
}
|
||||
if info.Size() == 0 {
|
||||
// Corrupt file, hence remove it.
|
||||
if err := os.RemoveAll(files[lastFile]); err != nil {
|
||||
return files, errors.Wrap(err, "delete corrupted, empty head chunk file during last file repair")
|
||||
}
|
||||
delete(files, lastFile)
|
||||
}
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
// WriteChunk writes the chunk to the disk.
|
||||
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
|
||||
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk chunkenc.Chunk) (chkRef uint64, err error) {
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
|
@ -363,6 +364,75 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
|
|||
testutil.Ok(t, hrw.Truncate(2000))
|
||||
}
|
||||
|
||||
func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
|
||||
hrw := testHeadReadWriter(t)
|
||||
defer func() {
|
||||
testutil.Ok(t, hrw.Close())
|
||||
}()
|
||||
|
||||
timeRange := 0
|
||||
addChunk := func() {
|
||||
step := 100
|
||||
mint, maxt := timeRange+1, timeRange+step-1
|
||||
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
|
||||
testutil.Ok(t, err)
|
||||
timeRange += step
|
||||
}
|
||||
nonEmptyFile := func() {
|
||||
testutil.Ok(t, hrw.CutNewFile())
|
||||
addChunk()
|
||||
}
|
||||
|
||||
addChunk() // 1. Created with the first chunk.
|
||||
nonEmptyFile() // 2.
|
||||
nonEmptyFile() // 3.
|
||||
|
||||
testutil.Equals(t, 3, len(hrw.mmappedChunkFiles))
|
||||
lastFile := 0
|
||||
for idx := range hrw.mmappedChunkFiles {
|
||||
if idx > lastFile {
|
||||
lastFile = idx
|
||||
}
|
||||
}
|
||||
testutil.Equals(t, 3, lastFile)
|
||||
dir := hrw.dir.Name()
|
||||
testutil.Ok(t, hrw.Close())
|
||||
|
||||
// Write an empty last file mimicking an abrupt shutdown on file creation.
|
||||
emptyFileName := segmentFile(dir, lastFile+1)
|
||||
f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0666)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, f.Sync())
|
||||
stat, err := f.Stat()
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, int64(0), stat.Size())
|
||||
testutil.Ok(t, f.Close())
|
||||
|
||||
// Open chunk disk mapper again, corrupt file should be removed.
|
||||
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
|
||||
testutil.Ok(t, err)
|
||||
testutil.Assert(t, !hrw.fileMaxtSet, "")
|
||||
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
|
||||
testutil.Assert(t, hrw.fileMaxtSet, "")
|
||||
|
||||
// Removed from memory.
|
||||
testutil.Equals(t, 3, len(hrw.mmappedChunkFiles))
|
||||
for idx := range hrw.mmappedChunkFiles {
|
||||
testutil.Assert(t, idx <= lastFile, "file index is bigger than previous last file")
|
||||
}
|
||||
|
||||
// Removed even from disk.
|
||||
files, err := ioutil.ReadDir(dir)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 3, len(files))
|
||||
for _, fi := range files {
|
||||
seq, err := strconv.ParseUint(fi.Name(), 10, 64)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Assert(t, seq <= uint64(lastFile), "file index on disk is bigger than previous last file")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testHeadReadWriter(t *testing.T) *ChunkDiskMapper {
|
||||
tmpdir, err := ioutil.TempDir("", "data")
|
||||
testutil.Ok(t, err)
|
||||
|
|
Loading…
Reference in a new issue