Fix 'chunks.HeadReadWriter: maxt of the files are not set' error (#7856)

* Fix chunks.HeadReadWriter: maxt of the files are not set

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2020-08-26 17:59:18 +00:00 committed by GitHub
parent 7c0d5ae4e7
commit c806262206
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 15 deletions

View file

@ -539,9 +539,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64,
defer cdm.writePathMtx.Unlock() defer cdm.writePathMtx.Unlock()
defer func() { defer func() {
if err == nil {
cdm.fileMaxtSet = true cdm.fileMaxtSet = true
}
}() }()
chkCRC32 := newCRC32() chkCRC32 := newCRC32()

View file

@ -15,6 +15,7 @@ package chunks
import ( import (
"encoding/binary" "encoding/binary"
"errors"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
@ -25,10 +26,9 @@ import (
) )
func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
hrw, close := testHeadReadWriter(t) hrw := testHeadReadWriter(t)
defer func() { defer func() {
testutil.Ok(t, hrw.Close()) testutil.Ok(t, hrw.Close())
close()
}() }()
expectedBytes := []byte{} expectedBytes := []byte{}
@ -162,10 +162,9 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
// * Empty current file does not lead to creation of another file after truncation. // * Empty current file does not lead to creation of another file after truncation.
// * Non-empty current file leads to creation of another file after truncation. // * Non-empty current file leads to creation of another file after truncation.
func TestHeadReadWriter_Truncate(t *testing.T) { func TestHeadReadWriter_Truncate(t *testing.T) {
hrw, close := testHeadReadWriter(t) hrw := testHeadReadWriter(t)
defer func() { defer func() {
testutil.Ok(t, hrw.Close()) testutil.Ok(t, hrw.Close())
close()
}() }()
timeRange := 0 timeRange := 0
@ -268,10 +267,9 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
// the truncation used to check all the files for deletion and end up // the truncation used to check all the files for deletion and end up
// deleting empty files in between and breaking the sequence. // deleting empty files in between and breaking the sequence.
func TestHeadReadWriter_Truncate_NoUnsequentialFiles(t *testing.T) { func TestHeadReadWriter_Truncate_NoUnsequentialFiles(t *testing.T) {
hrw, close := testHeadReadWriter(t) hrw := testHeadReadWriter(t)
defer func() { defer func() {
testutil.Ok(t, hrw.Close()) testutil.Ok(t, hrw.Close())
close()
}() }()
timeRange := 0 timeRange := 0
@ -337,17 +335,47 @@ func TestHeadReadWriter_Truncate_NoUnsequentialFiles(t *testing.T) {
verifyFiles([]int{3, 4, 5, 6, 7}) verifyFiles([]int{3, 4, 5, 6, 7})
} }
func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) { // TestHeadReadWriter_TruncateAfterIterateChunksError tests for
// https://github.com/prometheus/prometheus/issues/7753
func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
hrw := testHeadReadWriter(t)
defer func() {
testutil.Ok(t, hrw.Close())
}()
// Write a chunks to iterate on it later.
_, err := hrw.WriteChunk(1, 0, 1000, randomChunk(t))
testutil.Ok(t, err)
dir := hrw.dir.Name()
testutil.Ok(t, hrw.Close())
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
testutil.Ok(t, err)
// Forcefully failing IterateAllChunks.
testutil.NotOk(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error {
return errors.New("random error")
}))
// Truncation call should not return error after IterateAllChunks fails.
testutil.Ok(t, hrw.Truncate(2000))
}
func testHeadReadWriter(t *testing.T) *ChunkDiskMapper {
tmpdir, err := ioutil.TempDir("", "data") tmpdir, err := ioutil.TempDir("", "data")
testutil.Ok(t, err) testutil.Ok(t, err)
hrw, err = NewChunkDiskMapper(tmpdir, chunkenc.NewPool()) t.Cleanup(func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
})
hrw, err := NewChunkDiskMapper(tmpdir, chunkenc.NewPool())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Assert(t, !hrw.fileMaxtSet, "") testutil.Assert(t, !hrw.fileMaxtSet, "")
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
testutil.Assert(t, hrw.fileMaxtSet, "") testutil.Assert(t, hrw.fileMaxtSet, "")
return hrw, func() { return hrw
testutil.Ok(t, os.RemoveAll(tmpdir))
}
} }
func randomChunk(t *testing.T) chunkenc.Chunk { func randomChunk(t *testing.T) chunkenc.Chunk {

View file

@ -652,7 +652,7 @@ func (h *Head) Init(minValidTime int64) error {
} }
// If this fails, data will be recovered from WAL. // If this fails, data will be recovered from WAL.
// Hence we wont lose any data (given WAL is not corrupt). // Hence we wont lose any data (given WAL is not corrupt).
h.removeCorruptedMmappedChunks(err) mmappedChunks = h.removeCorruptedMmappedChunks(err)
} }
level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(start).String()) level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(start).String())