// Copyright 2020 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package chunks import ( "encoding/binary" "errors" "io/ioutil" "math/rand" "os" "strconv" "testing" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/tsdb/chunkenc" ) func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { hrw := testOldChunkDiskMapper(t) defer func() { require.NoError(t, hrw.Close()) }() expectedBytes := []byte{} nextChunkOffset := uint64(HeadChunkFileHeaderSize) chkCRC32 := newCRC32() type expectedDataType struct { seriesRef HeadSeriesRef chunkRef ChunkDiskMapperRef mint, maxt int64 numSamples uint16 chunk chunkenc.Chunk } expectedData := []expectedDataType{} var buf [MaxHeadChunkMetaSize]byte totalChunks := 0 var firstFileName string for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 { addChunks := func(numChunks int) { for i := 0; i < numChunks; i++ { seriesRef, chkRef, mint, maxt, chunk := createChunkForOld(t, totalChunks, hrw) totalChunks++ expectedData = append(expectedData, expectedDataType{ seriesRef: seriesRef, mint: mint, maxt: maxt, chunkRef: chkRef, chunk: chunk, numSamples: uint16(chunk.NumSamples()), }) if hrw.curFileSequence != 1 { // We are checking for bytes written only for the first file. continue } // Calculating expected bytes written on disk for first file. firstFileName = hrw.curFile.Name() require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef) bytesWritten := 0 chkCRC32.Reset() binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef)) bytesWritten += SeriesRefSize binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint)) bytesWritten += MintMaxtSize binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt)) bytesWritten += MintMaxtSize buf[bytesWritten] = byte(chunk.Encoding()) bytesWritten += ChunkEncodingSize n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes()))) bytesWritten += n expectedBytes = append(expectedBytes, buf[:bytesWritten]...) _, err := chkCRC32.Write(buf[:bytesWritten]) require.NoError(t, err) expectedBytes = append(expectedBytes, chunk.Bytes()...) _, err = chkCRC32.Write(chunk.Bytes()) require.NoError(t, err) expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...) // += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC. nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize } } addChunks(100) hrw.CutNewFile() addChunks(10) // For chunks in in-memory buffer. } // Checking on-disk bytes for the first file. require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles)) require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) actualBytes, err := ioutil.ReadFile(firstFileName) require.NoError(t, err) // Check header of the segment file. require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize]))) require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize])) // Remaining chunk data. fileEnd := HeadChunkFileHeaderSize + len(expectedBytes) require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd]) // Testing reading of chunks. for _, exp := range expectedData { actChunk, err := hrw.Chunk(exp.chunkRef) require.NoError(t, err) require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes()) } // Testing IterateAllChunks method. dir := hrw.dir.Name() require.NoError(t, hrw.Close()) hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) idx := 0 require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { t.Helper() expData := expectedData[idx] require.Equal(t, expData.seriesRef, seriesRef) require.Equal(t, expData.chunkRef, chunkRef) require.Equal(t, expData.maxt, maxt) require.Equal(t, expData.maxt, maxt) require.Equal(t, expData.numSamples, numSamples) actChunk, err := hrw.Chunk(expData.chunkRef) require.NoError(t, err) require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes()) idx++ return nil })) require.Equal(t, len(expectedData), idx) } func TestOldChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) { hrw := testOldChunkDiskMapper(t) defer func() { require.NoError(t, hrw.Close()) }() ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, nil, hrw) // Checking on-disk bytes for the first file. require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) // Testing IterateAllChunks method. dir := hrw.dir.Name() require.NoError(t, hrw.Close()) hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { t.Helper() require.Equal(t, ucSeriesRef, seriesRef) require.Equal(t, ucChkRef, chunkRef) require.Equal(t, ucMint, mint) require.Equal(t, ucMaxt, maxt) require.Equal(t, uchunk.Encoding(), encoding) // Asserts that the encoding is EncUnsupportedXOR actChunk, err := hrw.Chunk(chunkRef) // The chunk encoding is unknown so Chunk() should fail but us the caller // are ok with that. Above we asserted that the encoding we expected was // EncUnsupportedXOR require.NotNil(t, err) require.Contains(t, err.Error(), "invalid chunk encoding \"\"") require.Nil(t, actChunk) return nil })) } // TestOldChunkDiskMapper_Truncate tests // * If truncation is happening properly based on the time passed. // * The active file is not deleted even if the passed time makes it eligible to be deleted. // * Empty current file does not lead to creation of another file after truncation. // * Non-empty current file leads to creation of another file after truncation. func TestOldChunkDiskMapper_Truncate(t *testing.T) { hrw := testOldChunkDiskMapper(t) defer func() { require.NoError(t, hrw.Close()) }() timeRange := 0 fileTimeStep := 100 var thirdFileMinT, sixthFileMinT int64 addChunk := func() int { mint := timeRange + 1 // Just after the new file cut. maxt := timeRange + fileTimeStep - 1 // Just before the next file. // Write a chunks to set maxt for the segment. _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { require.NoError(t, err) }) timeRange += fileTimeStep return mint } verifyFiles := func(remainingFiles []int) { t.Helper() files, err := ioutil.ReadDir(hrw.dir.Name()) require.NoError(t, err) require.Equal(t, len(remainingFiles), len(files), "files on disk") require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") for _, i := range remainingFiles { _, ok := hrw.mmappedChunkFiles[i] require.Equal(t, true, ok) } } // Create segments 1 to 7. for i := 1; i <= 7; i++ { require.NoError(t, hrw.CutNewFile()) mint := int64(addChunk()) if i == 3 { thirdFileMinT = mint } else if i == 6 { sixthFileMinT = mint } } verifyFiles([]int{1, 2, 3, 4, 5, 6, 7}) // Truncating files. require.NoError(t, hrw.Truncate(thirdFileMinT)) verifyFiles([]int{3, 4, 5, 6, 7, 8}) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarted. var err error hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { return nil })) require.True(t, hrw.fileMaxtSet) verifyFiles([]int{3, 4, 5, 6, 7, 8}) // New file is created after restart even if last file was empty. addChunk() verifyFiles([]int{3, 4, 5, 6, 7, 8, 9}) // Truncating files after restart. require.NoError(t, hrw.Truncate(sixthFileMinT)) verifyFiles([]int{6, 7, 8, 9, 10}) // As the last file was empty, this creates no new files. require.NoError(t, hrw.Truncate(sixthFileMinT+1)) verifyFiles([]int{6, 7, 8, 9, 10}) addChunk() // Truncating till current time should not delete the current active file. require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep)))) verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created. } // TestOldChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke // holes into the file sequence, even if there are empty files in between non-empty files. // This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation // simply deleted all empty files instead of stopping once it encountered a non-empty file. func TestOldChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { hrw := testOldChunkDiskMapper(t) defer func() { require.NoError(t, hrw.Close()) }() timeRange := 0 addChunk := func() { step := 100 mint, maxt := timeRange+1, timeRange+step-1 _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { require.NoError(t, err) }) timeRange += step } emptyFile := func() { require.NoError(t, hrw.CutNewFile()) } nonEmptyFile := func() { emptyFile() addChunk() } addChunk() // 1. Created with the first chunk. nonEmptyFile() // 2. nonEmptyFile() // 3. emptyFile() // 4. nonEmptyFile() // 5. emptyFile() // 6. verifyFiles := func(remainingFiles []int) { t.Helper() files, err := ioutil.ReadDir(hrw.dir.Name()) require.NoError(t, err) require.Equal(t, len(remainingFiles), len(files), "files on disk") require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") for _, i := range remainingFiles { _, ok := hrw.mmappedChunkFiles[i] require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i) } } verifyFiles([]int{1, 2, 3, 4, 5, 6}) // Truncating files till 2. It should not delete anything after 3 (inclusive) // though files 4 and 6 are empty. file2Maxt := hrw.mmappedChunkFiles[2].maxt require.NoError(t, hrw.Truncate(file2Maxt+1)) // As 6 was empty, it should not create another file. verifyFiles([]int{3, 4, 5, 6}) addChunk() // Truncate creates another file as 6 is not empty now. require.NoError(t, hrw.Truncate(file2Maxt+1)) verifyFiles([]int{3, 4, 5, 6, 7}) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarting checks for unsequential files. var err error hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) verifyFiles([]int{3, 4, 5, 6, 7}) } // TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks tests for // https://github.com/prometheus/prometheus/issues/7753 func TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks(t *testing.T) { hrw := testOldChunkDiskMapper(t) defer func() { require.NoError(t, hrw.Close()) }() // Write a chunks to iterate on it later. _ = hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(err error) { require.NoError(t, err) }) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarting to recreate https://github.com/prometheus/prometheus/issues/7753. hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) // Forcefully failing IterateAllChunks. require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { return errors.New("random error") })) // Truncation call should not return error after IterateAllChunks fails. require.NoError(t, hrw.Truncate(2000)) } func TestOldChunkDiskMapper_ReadRepairOnEmptyLastFile(t *testing.T) { hrw := testOldChunkDiskMapper(t) defer func() { require.NoError(t, hrw.Close()) }() timeRange := 0 addChunk := func() { step := 100 mint, maxt := timeRange+1, timeRange+step-1 _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { require.NoError(t, err) }) timeRange += step } nonEmptyFile := func() { require.NoError(t, hrw.CutNewFile()) addChunk() } addChunk() // 1. Created with the first chunk. nonEmptyFile() // 2. nonEmptyFile() // 3. require.Equal(t, 3, len(hrw.mmappedChunkFiles)) lastFile := 0 for idx := range hrw.mmappedChunkFiles { if idx > lastFile { lastFile = idx } } require.Equal(t, 3, lastFile) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Write an empty last file mimicking an abrupt shutdown on file creation. emptyFileName := segmentFile(dir, lastFile+1) f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0o666) require.NoError(t, err) require.NoError(t, f.Sync()) stat, err := f.Stat() require.NoError(t, err) require.Equal(t, int64(0), stat.Size()) require.NoError(t, f.Close()) // Open chunk disk mapper again, corrupt file should be removed. hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { return nil })) require.True(t, hrw.fileMaxtSet) // Removed from memory. require.Equal(t, 3, len(hrw.mmappedChunkFiles)) for idx := range hrw.mmappedChunkFiles { require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file") } // Removed even from disk. files, err := ioutil.ReadDir(dir) require.NoError(t, err) require.Equal(t, 3, len(files)) for _, fi := range files { seq, err := strconv.ParseUint(fi.Name(), 10, 64) require.NoError(t, err) require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file") } } func testOldChunkDiskMapper(t *testing.T) *OldChunkDiskMapper { tmpdir, err := ioutil.TempDir("", "data") require.NoError(t, err) t.Cleanup(func() { require.NoError(t, os.RemoveAll(tmpdir)) }) hrw, err := NewOldChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { return nil })) require.True(t, hrw.fileMaxtSet) return hrw } func createChunkForOld(t *testing.T, idx int, hrw *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { seriesRef = HeadSeriesRef(rand.Int63()) mint = int64((idx)*1000 + 1) maxt = int64((idx + 1) * 1000) chunk = randomChunk(t) chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(err error) { require.NoError(t, err) }) return }