// Copyright 2020 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package chunks import ( "encoding/binary" "errors" "math/rand" "os" "strconv" "sync" "testing" "time" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/tsdb/chunkenc" ) var writeQueueSize int func TestMain(m *testing.M) { // Run all tests with the chunk write queue disabled. writeQueueSize = 0 exitVal := m.Run() if exitVal != 0 { os.Exit(exitVal) } // Re-run all tests with the chunk write queue size of 1e6. writeQueueSize = 1000000 exitVal = m.Run() os.Exit(exitVal) } func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { hrw := createChunkDiskMapper(t, "") defer func() { require.NoError(t, hrw.Close()) }() expectedBytes := []byte{} nextChunkOffset := uint64(HeadChunkFileHeaderSize) chkCRC32 := newCRC32() type expectedDataType struct { seriesRef HeadSeriesRef chunkRef ChunkDiskMapperRef mint, maxt int64 numSamples uint16 chunk chunkenc.Chunk isOOO bool } expectedData := []expectedDataType{} var buf [MaxHeadChunkMetaSize]byte totalChunks := 0 var firstFileName string for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 { addChunks := func(numChunks int) { for i := 0; i < numChunks; i++ { seriesRef, chkRef, mint, maxt, chunk, isOOO := createChunk(t, totalChunks, hrw) totalChunks++ expectedData = append(expectedData, expectedDataType{ seriesRef: seriesRef, mint: mint, maxt: maxt, chunkRef: chkRef, chunk: chunk, numSamples: uint16(chunk.NumSamples()), isOOO: isOOO, }) if hrw.curFileSequence != 1 { // We are checking for bytes written only for the first file. continue } // Calculating expected bytes written on disk for first file. firstFileName = hrw.curFile.Name() require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef) bytesWritten := 0 chkCRC32.Reset() binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef)) bytesWritten += SeriesRefSize binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint)) bytesWritten += MintMaxtSize binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt)) bytesWritten += MintMaxtSize enc := chunk.Encoding() if isOOO { enc = hrw.ApplyOutOfOrderMask(enc) } buf[bytesWritten] = byte(enc) bytesWritten += ChunkEncodingSize n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes()))) bytesWritten += n expectedBytes = append(expectedBytes, buf[:bytesWritten]...) _, err := chkCRC32.Write(buf[:bytesWritten]) require.NoError(t, err) expectedBytes = append(expectedBytes, chunk.Bytes()...) _, err = chkCRC32.Write(chunk.Bytes()) require.NoError(t, err) expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...) // += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC. nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize } } addChunks(100) hrw.CutNewFile() addChunks(10) // For chunks in in-memory buffer. } // Checking on-disk bytes for the first file. require.Len(t, hrw.mmappedChunkFiles, 3, "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles)) require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) actualBytes, err := os.ReadFile(firstFileName) require.NoError(t, err) // Check header of the segment file. require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize]))) require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize])) // Remaining chunk data. fileEnd := HeadChunkFileHeaderSize + len(expectedBytes) require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd]) // Testing reading of chunks. for _, exp := range expectedData { actChunk, err := hrw.Chunk(exp.chunkRef) require.NoError(t, err) require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes()) } // Testing IterateAllChunks method. dir := hrw.dir.Name() require.NoError(t, hrw.Close()) hrw = createChunkDiskMapper(t, dir) idx := 0 require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error { t.Helper() expData := expectedData[idx] require.Equal(t, expData.seriesRef, seriesRef) require.Equal(t, expData.chunkRef, chunkRef) require.Equal(t, expData.maxt, maxt) require.Equal(t, expData.maxt, maxt) require.Equal(t, expData.numSamples, numSamples) require.Equal(t, expData.isOOO, isOOO) actChunk, err := hrw.Chunk(expData.chunkRef) require.NoError(t, err) require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes()) idx++ return nil })) require.Len(t, expectedData, idx) } // TestChunkDiskMapper_Truncate tests // * If truncation is happening properly based on the time passed. // * The active file is not deleted even if the passed time makes it eligible to be deleted. // * Non-empty current file leads to creation of another file after truncation. func TestChunkDiskMapper_Truncate(t *testing.T) { hrw := createChunkDiskMapper(t, "") defer func() { require.NoError(t, hrw.Close()) }() timeRange := 0 addChunk := func() { t.Helper() step := 100 mint, maxt := timeRange+1, timeRange+step-1 var err error awaitCb := make(chan struct{}) hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(cbErr error) { err = cbErr close(awaitCb) }) <-awaitCb require.NoError(t, err) timeRange += step } verifyFiles := func(remainingFiles []int) { t.Helper() files, err := os.ReadDir(hrw.dir.Name()) require.NoError(t, err) require.Equal(t, len(remainingFiles), len(files), "files on disk") require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") for _, i := range remainingFiles { _, ok := hrw.mmappedChunkFiles[i] require.True(t, ok) } } // Create segments 1 to 7. for i := 1; i <= 7; i++ { hrw.CutNewFile() addChunk() } verifyFiles([]int{1, 2, 3, 4, 5, 6, 7}) // Truncating files. require.NoError(t, hrw.Truncate(3)) // Add a chunk to trigger cutting of new file. addChunk() verifyFiles([]int{3, 4, 5, 6, 7, 8}) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarted. hrw = createChunkDiskMapper(t, dir) verifyFiles([]int{3, 4, 5, 6, 7, 8}) // New file is created after restart even if last file was empty. addChunk() verifyFiles([]int{3, 4, 5, 6, 7, 8, 9}) // Truncating files after restart. require.NoError(t, hrw.Truncate(6)) verifyFiles([]int{6, 7, 8, 9}) // Truncating a second time without adding a chunk shouldn't create a new file. require.NoError(t, hrw.Truncate(6)) verifyFiles([]int{6, 7, 8, 9}) // Add a chunk to trigger cutting of new file. addChunk() verifyFiles([]int{6, 7, 8, 9, 10}) // Truncation by file number. require.NoError(t, hrw.Truncate(8)) verifyFiles([]int{8, 9, 10}) // Truncating till current time should not delete the current active file. require.NoError(t, hrw.Truncate(10)) // Add a chunk to trigger cutting of new file. addChunk() verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created. } // TestChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke // holes into the file sequence, even if there are empty files in between non-empty files. // This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation // simply deleted all empty files instead of stopping once it encountered a non-empty file. func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { hrw := createChunkDiskMapper(t, "") defer func() { require.NoError(t, hrw.Close()) }() timeRange := 0 addChunk := func() { t.Helper() awaitCb := make(chan struct{}) step := 100 mint, maxt := timeRange+1, timeRange+step-1 hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(err error) { close(awaitCb) require.NoError(t, err) }) <-awaitCb timeRange += step } emptyFile := func() { t.Helper() _, _, err := hrw.cut() require.NoError(t, err) hrw.evtlPosMtx.Lock() hrw.evtlPos.toNewFile() hrw.evtlPosMtx.Unlock() } nonEmptyFile := func() { t.Helper() emptyFile() addChunk() } addChunk() // 1. Created with the first chunk. nonEmptyFile() // 2. nonEmptyFile() // 3. emptyFile() // 4. nonEmptyFile() // 5. emptyFile() // 6. verifyFiles := func(remainingFiles []int) { t.Helper() files, err := os.ReadDir(hrw.dir.Name()) require.NoError(t, err) require.Equal(t, len(remainingFiles), len(files), "files on disk") require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") for _, i := range remainingFiles { _, ok := hrw.mmappedChunkFiles[i] require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i) } } verifyFiles([]int{1, 2, 3, 4, 5, 6}) // Truncating files till 2. It should not delete anything after 3 (inclusive) // though files 4 and 6 are empty. require.NoError(t, hrw.Truncate(3)) verifyFiles([]int{3, 4, 5, 6}) // Add chunk, so file 6 is not empty anymore. addChunk() verifyFiles([]int{3, 4, 5, 6}) // Truncating till file 3 should also delete file 4, because it is empty. require.NoError(t, hrw.Truncate(5)) addChunk() verifyFiles([]int{5, 6, 7}) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarting checks for unsequential files. hrw = createChunkDiskMapper(t, dir) verifyFiles([]int{5, 6, 7}) } func TestChunkDiskMapper_Truncate_WriteQueueRaceCondition(t *testing.T) { hrw := createChunkDiskMapper(t, "") t.Cleanup(func() { require.NoError(t, hrw.Close()) }) // This test should only run when the queue is enabled. if hrw.writeQueue == nil { t.Skip("This test should only run when the queue is enabled") } // Add an artificial delay in the writeChunk function to easily trigger the race condition. origWriteChunk := hrw.writeQueue.writeChunk hrw.writeQueue.writeChunk = func(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) error { time.Sleep(100 * time.Millisecond) return origWriteChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile) } wg := sync.WaitGroup{} wg.Add(2) // Write a chunk. Since the queue is enabled, the chunk will be written asynchronously (with the artificial delay). ref := hrw.WriteChunk(1, 0, 10, randomChunk(t), false, func(err error) { defer wg.Done() require.NoError(t, err) }) seq, _ := ref.Unpack() require.Equal(t, 1, seq) // Truncate, simulating that all chunks from segment files before 1 can be dropped. require.NoError(t, hrw.Truncate(1)) // Request to cut a new file when writing the next chunk. If there's a race condition, cutting a new file will // allow us to detect there's actually an issue with the sequence number (because it's checked when a new segment // file is created). hrw.CutNewFile() // Write another chunk. This will cut a new file. ref = hrw.WriteChunk(1, 0, 10, randomChunk(t), false, func(err error) { defer wg.Done() require.NoError(t, err) }) seq, _ = ref.Unpack() require.Equal(t, 2, seq) wg.Wait() } // TestHeadReadWriter_TruncateAfterFailedIterateChunks tests for // https://github.com/prometheus/prometheus/issues/7753 func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { hrw := createChunkDiskMapper(t, "") defer func() { require.NoError(t, hrw.Close()) }() // Write a chunks to iterate on it later. var err error awaitCb := make(chan struct{}) hrw.WriteChunk(1, 0, 1000, randomChunk(t), false, func(cbErr error) { err = cbErr close(awaitCb) }) <-awaitCb require.NoError(t, err) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarting to recreate https://github.com/prometheus/prometheus/issues/7753. hrw = createChunkDiskMapper(t, dir) // Forcefully failing IterateAllChunks. require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding, _ bool) error { return errors.New("random error") })) // Truncation call should not return error after IterateAllChunks fails. require.NoError(t, hrw.Truncate(2000)) } func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { hrw := createChunkDiskMapper(t, "") timeRange := 0 addChunk := func() { t.Helper() step := 100 mint, maxt := timeRange+1, timeRange+step-1 var err error awaitCb := make(chan struct{}) hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(cbErr error) { err = cbErr close(awaitCb) }) <-awaitCb require.NoError(t, err) timeRange += step } nonEmptyFile := func() { t.Helper() hrw.CutNewFile() addChunk() } addChunk() // 1. Created with the first chunk. nonEmptyFile() // 2. nonEmptyFile() // 3. require.Len(t, hrw.mmappedChunkFiles, 3) lastFile := 0 for idx := range hrw.mmappedChunkFiles { if idx > lastFile { lastFile = idx } } require.Equal(t, 3, lastFile) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) writeCorruptLastFile := func(b []byte) { fname := segmentFile(dir, lastFile+1) f, err := os.OpenFile(fname, os.O_WRONLY|os.O_CREATE, 0o666) require.NoError(t, err) _, err = f.Write(b) require.NoError(t, err) require.NoError(t, f.Sync()) stat, err := f.Stat() require.NoError(t, err) require.Equal(t, int64(len(b)), stat.Size()) require.NoError(t, f.Close()) } checkRepair := func() { // Open chunk disk mapper again, corrupt file should be removed. hrw = createChunkDiskMapper(t, dir) // Removed from memory. require.Len(t, hrw.mmappedChunkFiles, 3) for idx := range hrw.mmappedChunkFiles { require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file") } // Removed even from disk. files, err := os.ReadDir(dir) require.NoError(t, err) require.Len(t, files, 3) for _, fi := range files { seq, err := strconv.ParseUint(fi.Name(), 10, 64) require.NoError(t, err) require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file") } require.NoError(t, hrw.Close()) } // Write an empty last file mimicking an abrupt shutdown on file creation. writeCorruptLastFile(nil) // Removes empty last file. checkRepair() // Write another empty last file with 0 bytes. writeCorruptLastFile([]byte{0, 0, 0, 0, 0, 0, 0, 0}) // Removes the 0 filled last file. checkRepair() // Write another corrupt file with less than 4 bytes (size of magic number). writeCorruptLastFile([]byte{1, 2}) // Removes the partial file. checkRepair() // Check that it does not delete a valid last file. checkRepair() } func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper { if dir == "" { dir = t.TempDir() } hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, writeQueueSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding, _ bool) error { return nil })) require.True(t, hrw.fileMaxtSet) return hrw } func randomChunk(t *testing.T) chunkenc.Chunk { chunk := chunkenc.NewXORChunk() length := rand.Int() % 120 app, err := chunk.Appender() require.NoError(t, err) for i := 0; i < length; i++ { app.Append(rand.Int63(), rand.Float64()) } return chunk } func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk, isOOO bool) { var err error seriesRef = HeadSeriesRef(rand.Int63()) mint = int64((idx)*1000 + 1) maxt = int64((idx + 1) * 1000) chunk = randomChunk(t) awaitCb := make(chan struct{}) if rand.Intn(2) == 0 { isOOO = true } chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, isOOO, func(cbErr error) { require.NoError(t, err) close(awaitCb) }) <-awaitCb return }