From 48fae12b8992c9746227fb41982e27fb3af46b73 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Thu, 18 Jun 2020 19:24:58 +0530 Subject: [PATCH] Fix unsequential m-map files (#7414) * Fix unsequential m-map files Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar --- tsdb/chunks/head_chunks.go | 9 ++- tsdb/chunks/head_chunks_test.go | 119 +++++++++++++++++++++++++++----- 2 files changed, 108 insertions(+), 20 deletions(-) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index a7edbe982..69e1d1598 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -670,8 +670,8 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error { var removedFiles []int for _, seq := range chkFileIndices { - if seq == cdm.curFileSequence { - continue + if seq == cdm.curFileSequence || cdm.mmappedChunkFiles[seq].maxt >= mint { + break } if cdm.mmappedChunkFiles[seq].maxt < mint { removedFiles = append(removedFiles, seq) @@ -680,7 +680,10 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error { cdm.readPathMtx.RUnlock() var merr tsdb_errors.MultiError - merr.Add(cdm.CutNewFile()) + // Cut a new file only if the current file has some chunks. + if cdm.curFileNumBytes > HeadChunkFileHeaderSize { + merr.Add(cdm.CutNewFile()) + } merr.Add(cdm.deleteFiles(removedFiles)) return merr.Err() } diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 18982b13e..288e02b7d 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -156,6 +156,11 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { } +// TestHeadReadWriter_Truncate tests +// * If truncation is happening properly based on the time passed. +// * The active file is not deleted even if the passed time makes it eligible to be deleted. +// * Empty current file does not lead to creation of another file after truncation. +// * Non-empty current file leads to creation of another file after truncation. func TestHeadReadWriter_Truncate(t *testing.T) { hrw, close := testHeadReadWriter(t) defer func() { @@ -163,10 +168,6 @@ func TestHeadReadWriter_Truncate(t *testing.T) { close() }() - testutil.Assert(t, !hrw.fileMaxtSet, "") - testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) - testutil.Assert(t, hrw.fileMaxtSet, "") - timeRange := 0 fileTimeStep := 100 totalFiles := 7 @@ -204,15 +205,15 @@ func TestHeadReadWriter_Truncate(t *testing.T) { cutFile(i) } - // Verifying the the remaining files. - verifyRemainingFiles := func(remainingFiles, startIndex int) { - //t.Helper() + // Verifying the files. + verifyFiles := func(remainingFiles, startIndex int) { + t.Helper() files, err := ioutil.ReadDir(hrw.dir.Name()) testutil.Ok(t, err) - testutil.Equals(t, remainingFiles, len(files)) - testutil.Equals(t, remainingFiles, len(hrw.mmappedChunkFiles)) - testutil.Equals(t, remainingFiles, len(hrw.closers)) + testutil.Equals(t, remainingFiles, len(files), "files on disk") + testutil.Equals(t, remainingFiles, len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") + testutil.Equals(t, remainingFiles, len(hrw.closers), "closers") for i := 1; i <= totalFiles; i++ { _, ok := hrw.mmappedChunkFiles[i] @@ -225,12 +226,12 @@ func TestHeadReadWriter_Truncate(t *testing.T) { } // Verify the number of segments. - verifyRemainingFiles(totalFiles, 1) + verifyFiles(totalFiles, 1) // Truncating files. testutil.Ok(t, hrw.Truncate(timeToTruncate)) - totalFiles++ // Truncation creates a new file. - verifyRemainingFiles(totalFiles-filesDeletedAfter1stTruncation, startIndexAfter1stTruncation) + totalFiles++ // Truncation creates a new file as the last file is not empty. + verifyFiles(totalFiles-filesDeletedAfter1stTruncation, startIndexAfter1stTruncation) addChunk() // Add a chunk so that new file is not truncated. dir := hrw.dir.Name() @@ -245,14 +246,95 @@ func TestHeadReadWriter_Truncate(t *testing.T) { testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) testutil.Assert(t, hrw.fileMaxtSet, "") - // Truncating files after restart. + // Truncating files after restart. As the last file was empty, this creates no new files. testutil.Ok(t, hrw.Truncate(timeToTruncateAfterRestart)) - totalFiles++ // Truncation creates a new file. - verifyRemainingFiles(totalFiles-filesDeletedAfter2ndTruncation, startIndexAfter2ndTruncation) + verifyFiles(totalFiles-filesDeletedAfter2ndTruncation, startIndexAfter2ndTruncation) + + // First chunk after restart creates a new file. + addChunk() + totalFiles++ // Truncating till current time should not delete the current active file. testutil.Ok(t, hrw.Truncate(int64(timeRange+fileTimeStep))) - verifyRemainingFiles(2, totalFiles) // One file was the active file and one was newly created. + verifyFiles(2, totalFiles) // One file is the active file and one was newly created. +} + +// TestHeadReadWriter_Truncate_NoUnsequentialFiles tests +// that truncation leaves no unsequential files on disk, mainly under the following case +// * There is an empty file in between the sequence while the truncation +// deletes files only upto a sequence before that (i.e. stops deleting +// after it has found a file that is not deletable). +// This tests https://github.com/prometheus/prometheus/issues/7412 where +// the truncation used to check all the files for deletion and end up +// deleting empty files in between and breaking the sequence. +func TestHeadReadWriter_Truncate_NoUnsequentialFiles(t *testing.T) { + hrw, close := testHeadReadWriter(t) + defer func() { + testutil.Ok(t, hrw.Close()) + close() + }() + + timeRange := 0 + addChunk := func() { + step := 100 + mint, maxt := timeRange+1, timeRange+step-1 + _, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t)) + testutil.Ok(t, err) + timeRange += step + } + emptyFile := func() { + testutil.Ok(t, hrw.CutNewFile()) + } + nonEmptyFile := func() { + emptyFile() + addChunk() + } + + addChunk() // 1. Created with the first chunk. + nonEmptyFile() // 2. + nonEmptyFile() // 3. + emptyFile() // 4. + nonEmptyFile() // 5. + emptyFile() // 6. + + // Verifying the files. + verifyFiles := func(remainingFiles []int) { + t.Helper() + + files, err := ioutil.ReadDir(hrw.dir.Name()) + testutil.Ok(t, err) + testutil.Equals(t, len(remainingFiles), len(files), "files on disk") + testutil.Equals(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") + testutil.Equals(t, len(remainingFiles), len(hrw.closers), "closers") + + for _, i := range remainingFiles { + _, ok := hrw.mmappedChunkFiles[i] + testutil.Equals(t, true, ok) + } + } + + verifyFiles([]int{1, 2, 3, 4, 5, 6}) + + // Truncating files till 2. It should not delete anything after 3 (inclusive) + // though files 4 and 6 are empty. + file2Maxt := hrw.mmappedChunkFiles[2].maxt + testutil.Ok(t, hrw.Truncate(file2Maxt+1)) + // As 6 was empty, it should not create another file. + verifyFiles([]int{3, 4, 5, 6}) + + addChunk() + // Truncate creates another file as 6 is not empty now. + testutil.Ok(t, hrw.Truncate(file2Maxt+1)) + verifyFiles([]int{3, 4, 5, 6, 7}) + + dir := hrw.dir.Name() + testutil.Ok(t, hrw.Close()) + + // Restarting checks for unsequential files. + var err error + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + testutil.Ok(t, err) + verifyFiles([]int{3, 4, 5, 6, 7}) } func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) { @@ -260,6 +342,9 @@ func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) { testutil.Ok(t, err) hrw, err = NewChunkDiskMapper(tmpdir, chunkenc.NewPool()) testutil.Ok(t, err) + testutil.Assert(t, !hrw.fileMaxtSet, "") + testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + testutil.Assert(t, hrw.fileMaxtSet, "") return hrw, func() { testutil.Ok(t, os.RemoveAll(tmpdir)) }