diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 9d39fc986..486d498d8 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -26,7 +26,6 @@ import ( "strconv" "sync" "sync/atomic" - "time" "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -50,12 +49,6 @@ var ( ) const ( - // DefaultHeadChunkFileMaxTimeRange is the default head chunk file time range. - // Assuming a general scrape interval of 15s, a chunk with 120 samples would - // be cut every 30m, so anything <30m will cause lots of empty files. And keeping - // it exactly 30m also has a chance of having empty files as its near that border. - // Hence keeping it a little more than 30m, i.e. 40m. - DefaultHeadChunkFileMaxTimeRange = 40 * int64(time.Minute/time.Millisecond) // MintMaxtSize is the size of the mint/maxt for head chunk file and chunks. MintMaxtSize = 8 // SeriesRefSize is the size of series reference on disk. @@ -63,7 +56,7 @@ const ( // HeadChunkFileHeaderSize is the total size of the header for the head chunk file. HeadChunkFileHeaderSize = SegmentHeaderSize // MaxHeadChunkFileSize is the max size of a head chunk file. - MaxHeadChunkFileSize = 512 * 1024 * 1024 // 512 MiB. + MaxHeadChunkFileSize = 128 * 1024 * 1024 // 128 MiB. // CRCSize is the size of crc32 sum on disk. CRCSize = 4 // MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data. @@ -90,15 +83,13 @@ type ChunkDiskMapper struct { curFile *os.File // File being written to. curFileSequence int // Index of current open file being appended to. - curFileMint int64 // Used to check for a chunk crossing the max file time range. curFileMaxt int64 // Used for the size retention. curFileNumBytes int64 // Bytes written in current open file. - maxFileTime int64 // Max time range (curFileMaxt-curFileMint) for a file. byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk. chkWriter *bufio.Writer // Writer for the current open file. crc32 hash.Hash - writePathMtx sync.RWMutex + writePathMtx sync.Mutex /// Reader. // The int key in the map is the file number on the disk. @@ -133,14 +124,6 @@ type mmappedChunkFile struct { // NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. func NewChunkDiskMapper(dir string, pool chunkenc.Pool) (*ChunkDiskMapper, error) { - return newChunkDiskMapper(dir, DefaultHeadChunkFileMaxTimeRange, pool) -} - -func newChunkDiskMapper(dir string, maxFileDuration int64, pool chunkenc.Pool) (*ChunkDiskMapper, error) { - if maxFileDuration <= 0 { - maxFileDuration = DefaultHeadChunkFileMaxTimeRange - } - if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } @@ -151,7 +134,6 @@ func newChunkDiskMapper(dir string, maxFileDuration int64, pool chunkenc.Pool) ( m := &ChunkDiskMapper{ dir: dirFile, - maxFileTime: maxFileDuration, pool: pool, crc32: newCRC32(), chunkBuffer: newChunkBuffer(), @@ -256,8 +238,8 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c return 0, ErrChunkDiskMapperClosed } - if cdm.shouldCutNewFile(len(chk.Bytes()), maxt) { - if err := cdm.cut(mint); err != nil { + if cdm.shouldCutNewFile(len(chk.Bytes())) { + if err := cdm.cut(); err != nil { return 0, err } } @@ -301,9 +283,6 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c if maxt > cdm.curFileMaxt { cdm.curFileMaxt = maxt } - if mint < cdm.curFileMint { - cdm.curFileMint = mint - } cdm.chunkBuffer.put(chkRef, chk) @@ -325,13 +304,21 @@ func chunkRef(seq, offset uint64) (chunkRef uint64) { // shouldCutNewFile decides the cutting of a new file based on time and size retention. // Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. // Time retention: so that we can delete old chunks with some time guarantee in low load environments. -func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int, maxt int64) bool { +func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int) bool { return cdm.curFileNumBytes == 0 || // First head chunk file. - (maxt-cdm.curFileMint > cdm.maxFileTime && cdm.curFileNumBytes > HeadChunkFileHeaderSize) || // Time duration reached for the existing file. cdm.curFileNumBytes+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size. } -func (cdm *ChunkDiskMapper) cut(mint int64) (returnErr error) { +// CutNewFile creates a new m-mapped file. +func (cdm *ChunkDiskMapper) CutNewFile() (returnErr error) { + cdm.writePathMtx.Lock() + defer cdm.writePathMtx.Unlock() + + return cdm.cut() +} + +// cut creates a new m-mapped file. The write lock should be held before calling this. +func (cdm *ChunkDiskMapper) cut() (returnErr error) { // Sync current tail to disk and close. if err := cdm.finalizeCurFile(); err != nil { return err @@ -367,7 +354,6 @@ func (cdm *ChunkDiskMapper) cut(mint int64) (returnErr error) { } cdm.curFileSequence = seq - cdm.curFileMint = mint cdm.curFile = newFile if cdm.chkWriter != nil { cdm.chkWriter.Reset(newFile) @@ -690,7 +676,10 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error { } cdm.readPathMtx.RUnlock() - return cdm.deleteFiles(removedFiles) + var merr tsdb_errors.MultiError + merr.Add(cdm.CutNewFile()) + merr.Add(cdm.deleteFiles(removedFiles)) + return merr.Err() } func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 073a16955..18982b13e 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -19,7 +19,6 @@ import ( "math/rand" "os" "testing" - "time" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/testutil" @@ -48,53 +47,58 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { totalChunks := 0 var firstFileName string for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 { - for i := 0; i < 100; i++ { - seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw) - totalChunks++ - expectedData = append(expectedData, expectedDataType{ - seriesRef: seriesRef, - mint: mint, - maxt: maxt, - chunkRef: chkRef, - chunk: chunk, - numSamples: uint16(chunk.NumSamples()), - }) + addChunks := func(numChunks int) { + for i := 0; i < numChunks; i++ { + seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw) + totalChunks++ + expectedData = append(expectedData, expectedDataType{ + seriesRef: seriesRef, + mint: mint, + maxt: maxt, + chunkRef: chkRef, + chunk: chunk, + numSamples: uint16(chunk.NumSamples()), + }) - if hrw.curFileSequence != 1 { - // We are checking for bytes written only for the first file. - continue + if hrw.curFileSequence != 1 { + // We are checking for bytes written only for the first file. + continue + } + + // Calculating expected bytes written on disk for first file. + firstFileName = hrw.curFile.Name() + testutil.Equals(t, chunkRef(1, nextChunkOffset), chkRef) + + bytesWritten := 0 + chkCRC32.Reset() + + binary.BigEndian.PutUint64(buf[bytesWritten:], seriesRef) + bytesWritten += SeriesRefSize + binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint)) + bytesWritten += MintMaxtSize + binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt)) + bytesWritten += MintMaxtSize + buf[bytesWritten] = byte(chunk.Encoding()) + bytesWritten += ChunkEncodingSize + n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes()))) + bytesWritten += n + + expectedBytes = append(expectedBytes, buf[:bytesWritten]...) + _, err := chkCRC32.Write(buf[:bytesWritten]) + testutil.Ok(t, err) + expectedBytes = append(expectedBytes, chunk.Bytes()...) + _, err = chkCRC32.Write(chunk.Bytes()) + testutil.Ok(t, err) + + expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...) + + // += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC. + nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize } - - // Calculating expected bytes written on disk for first file. - firstFileName = hrw.curFile.Name() - testutil.Equals(t, chunkRef(1, nextChunkOffset), chkRef) - - bytesWritten := 0 - chkCRC32.Reset() - - binary.BigEndian.PutUint64(buf[bytesWritten:], seriesRef) - bytesWritten += SeriesRefSize - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint)) - bytesWritten += MintMaxtSize - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt)) - bytesWritten += MintMaxtSize - buf[bytesWritten] = byte(chunk.Encoding()) - bytesWritten += ChunkEncodingSize - n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes()))) - bytesWritten += n - - expectedBytes = append(expectedBytes, buf[:bytesWritten]...) - _, err := chkCRC32.Write(buf[:bytesWritten]) - testutil.Ok(t, err) - expectedBytes = append(expectedBytes, chunk.Bytes()...) - _, err = chkCRC32.Write(chunk.Bytes()) - testutil.Ok(t, err) - - expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...) - - // += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC. - nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize } + addChunks(100) + hrw.CutNewFile() + addChunks(10) // For chunks in in-memory buffer. } // Checking on-disk bytes for the first file. @@ -165,12 +169,12 @@ func TestHeadReadWriter_Truncate(t *testing.T) { timeRange := 0 fileTimeStep := 100 - totalFiles, after1stTruncation, after2ndTruncation := 7, 5, 3 + totalFiles := 7 + startIndexAfter1stTruncation, startIndexAfter2ndTruncation := 3, 6 + filesDeletedAfter1stTruncation, filesDeletedAfter2ndTruncation := 2, 5 var timeToTruncate, timeToTruncateAfterRestart int64 - cutFile := func(i int) { - testutil.Ok(t, hrw.cut(int64(timeRange))) - + addChunk := func() int { mint := timeRange + 1 // Just after the the new file cut. maxt := timeRange + fileTimeStep - 1 // Just before the next file. @@ -178,15 +182,21 @@ func TestHeadReadWriter_Truncate(t *testing.T) { _, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t)) testutil.Ok(t, err) - if i == totalFiles-after1stTruncation+1 { - // Truncate the segment files before the 5th segment. + timeRange += fileTimeStep + + return mint + } + + cutFile := func(i int) { + testutil.Ok(t, hrw.CutNewFile()) + + mint := addChunk() + + if i == startIndexAfter1stTruncation { timeToTruncate = int64(mint) - } else if i == totalFiles-after2ndTruncation+1 { - // Truncate the segment files before the 3rd segment after restart. + } else if i == startIndexAfter2ndTruncation { timeToTruncateAfterRestart = int64(mint) } - - timeRange += fileTimeStep } // Cut segments. @@ -195,8 +205,8 @@ func TestHeadReadWriter_Truncate(t *testing.T) { } // Verifying the the remaining files. - verifyRemainingFiles := func(remainingFiles int) { - t.Helper() + verifyRemainingFiles := func(remainingFiles, startIndex int) { + //t.Helper() files, err := ioutil.ReadDir(hrw.dir.Name()) testutil.Ok(t, err) @@ -206,7 +216,7 @@ func TestHeadReadWriter_Truncate(t *testing.T) { for i := 1; i <= totalFiles; i++ { _, ok := hrw.mmappedChunkFiles[i] - if i < totalFiles-remainingFiles+1 { + if i < startIndex { testutil.Equals(t, false, ok) } else { testutil.Equals(t, true, ok) @@ -215,11 +225,13 @@ func TestHeadReadWriter_Truncate(t *testing.T) { } // Verify the number of segments. - verifyRemainingFiles(totalFiles) + verifyRemainingFiles(totalFiles, 1) // Truncating files. testutil.Ok(t, hrw.Truncate(timeToTruncate)) - verifyRemainingFiles(after1stTruncation) + totalFiles++ // Truncation creates a new file. + verifyRemainingFiles(totalFiles-filesDeletedAfter1stTruncation, startIndexAfter1stTruncation) + addChunk() // Add a chunk so that new file is not truncated. dir := hrw.dir.Name() testutil.Ok(t, hrw.Close()) @@ -235,14 +247,12 @@ func TestHeadReadWriter_Truncate(t *testing.T) { // Truncating files after restart. testutil.Ok(t, hrw.Truncate(timeToTruncateAfterRestart)) - verifyRemainingFiles(after2ndTruncation) + totalFiles++ // Truncation creates a new file. + verifyRemainingFiles(totalFiles-filesDeletedAfter2ndTruncation, startIndexAfter2ndTruncation) - // Add another file to have an active file. - totalFiles++ - cutFile(totalFiles) // Truncating till current time should not delete the current active file. - testutil.Ok(t, hrw.Truncate(time.Now().UnixNano()/1e6)) - verifyRemainingFiles(1) + testutil.Ok(t, hrw.Truncate(int64(timeRange+fileTimeStep))) + verifyRemainingFiles(2, totalFiles) // One file was the active file and one was newly created. } func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index dd51f3620..b97932a30 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1300,7 +1300,7 @@ func TestHeadReadWriterRepair(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - const chunkRange = chunks.DefaultHeadChunkFileMaxTimeRange // to hold 4 chunks per segment. + const chunkRange = 1000 walDir := filepath.Join(dir, "wal") // Fill the chunk segments and corrupt it. @@ -1317,19 +1317,20 @@ func TestHeadReadWriterRepair(t *testing.T) { testutil.Assert(t, created, "series was not created") for i := 0; i < 7; i++ { - ok, chunkCreated := s.append(int64(i*int(chunkRange)), float64(i*int(chunkRange)), 0, h.chunkDiskMapper) + ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunk was not created") - ok, chunkCreated = s.append(int64(i*int(chunkRange))+chunkRange-1, float64(i*int(chunkRange)), 0, h.chunkDiskMapper) + ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunk was created") + testutil.Ok(t, h.chunkDiskMapper.CutNewFile()) } testutil.Ok(t, h.Close()) - // Verify that there are 6 segment files. + // Verify that there are 7 segment files. files, err := ioutil.ReadDir(mmappedChunksDir(dir)) testutil.Ok(t, err) - testutil.Equals(t, 6, len(files)) + testutil.Equals(t, 7, len(files)) // Corrupt the 4th file by writing a random byte to series ref. f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0666)