Remove time based m-map file creation (#7314)

* Remove time based m-map file creation

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2020-05-29 20:08:41 +05:30 committed by GitHub
parent c7d9516695
commit a1355eb7c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 101 deletions

View file

@ -26,7 +26,6 @@ import (
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
@ -50,12 +49,6 @@ var (
) )
const ( const (
// DefaultHeadChunkFileMaxTimeRange is the default head chunk file time range.
// Assuming a general scrape interval of 15s, a chunk with 120 samples would
// be cut every 30m, so anything <30m will cause lots of empty files. And keeping
// it exactly 30m also has a chance of having empty files as its near that border.
// Hence keeping it a little more than 30m, i.e. 40m.
DefaultHeadChunkFileMaxTimeRange = 40 * int64(time.Minute/time.Millisecond)
// MintMaxtSize is the size of the mint/maxt for head chunk file and chunks. // MintMaxtSize is the size of the mint/maxt for head chunk file and chunks.
MintMaxtSize = 8 MintMaxtSize = 8
// SeriesRefSize is the size of series reference on disk. // SeriesRefSize is the size of series reference on disk.
@ -63,7 +56,7 @@ const (
// HeadChunkFileHeaderSize is the total size of the header for the head chunk file. // HeadChunkFileHeaderSize is the total size of the header for the head chunk file.
HeadChunkFileHeaderSize = SegmentHeaderSize HeadChunkFileHeaderSize = SegmentHeaderSize
// MaxHeadChunkFileSize is the max size of a head chunk file. // MaxHeadChunkFileSize is the max size of a head chunk file.
MaxHeadChunkFileSize = 512 * 1024 * 1024 // 512 MiB. MaxHeadChunkFileSize = 128 * 1024 * 1024 // 128 MiB.
// CRCSize is the size of crc32 sum on disk. // CRCSize is the size of crc32 sum on disk.
CRCSize = 4 CRCSize = 4
// MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data. // MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data.
@ -90,15 +83,13 @@ type ChunkDiskMapper struct {
curFile *os.File // File being written to. curFile *os.File // File being written to.
curFileSequence int // Index of current open file being appended to. curFileSequence int // Index of current open file being appended to.
curFileMint int64 // Used to check for a chunk crossing the max file time range.
curFileMaxt int64 // Used for the size retention. curFileMaxt int64 // Used for the size retention.
curFileNumBytes int64 // Bytes written in current open file. curFileNumBytes int64 // Bytes written in current open file.
maxFileTime int64 // Max time range (curFileMaxt-curFileMint) for a file.
byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk. byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk.
chkWriter *bufio.Writer // Writer for the current open file. chkWriter *bufio.Writer // Writer for the current open file.
crc32 hash.Hash crc32 hash.Hash
writePathMtx sync.RWMutex writePathMtx sync.Mutex
/// Reader. /// Reader.
// The int key in the map is the file number on the disk. // The int key in the map is the file number on the disk.
@ -133,14 +124,6 @@ type mmappedChunkFile struct {
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper // NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file. // to set the maxt of all the file.
func NewChunkDiskMapper(dir string, pool chunkenc.Pool) (*ChunkDiskMapper, error) { func NewChunkDiskMapper(dir string, pool chunkenc.Pool) (*ChunkDiskMapper, error) {
return newChunkDiskMapper(dir, DefaultHeadChunkFileMaxTimeRange, pool)
}
func newChunkDiskMapper(dir string, maxFileDuration int64, pool chunkenc.Pool) (*ChunkDiskMapper, error) {
if maxFileDuration <= 0 {
maxFileDuration = DefaultHeadChunkFileMaxTimeRange
}
if err := os.MkdirAll(dir, 0777); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err return nil, err
} }
@ -151,7 +134,6 @@ func newChunkDiskMapper(dir string, maxFileDuration int64, pool chunkenc.Pool) (
m := &ChunkDiskMapper{ m := &ChunkDiskMapper{
dir: dirFile, dir: dirFile,
maxFileTime: maxFileDuration,
pool: pool, pool: pool,
crc32: newCRC32(), crc32: newCRC32(),
chunkBuffer: newChunkBuffer(), chunkBuffer: newChunkBuffer(),
@ -256,8 +238,8 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c
return 0, ErrChunkDiskMapperClosed return 0, ErrChunkDiskMapperClosed
} }
if cdm.shouldCutNewFile(len(chk.Bytes()), maxt) { if cdm.shouldCutNewFile(len(chk.Bytes())) {
if err := cdm.cut(mint); err != nil { if err := cdm.cut(); err != nil {
return 0, err return 0, err
} }
} }
@ -301,9 +283,6 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c
if maxt > cdm.curFileMaxt { if maxt > cdm.curFileMaxt {
cdm.curFileMaxt = maxt cdm.curFileMaxt = maxt
} }
if mint < cdm.curFileMint {
cdm.curFileMint = mint
}
cdm.chunkBuffer.put(chkRef, chk) cdm.chunkBuffer.put(chkRef, chk)
@ -325,13 +304,21 @@ func chunkRef(seq, offset uint64) (chunkRef uint64) {
// shouldCutNewFile decides the cutting of a new file based on time and size retention. // shouldCutNewFile decides the cutting of a new file based on time and size retention.
// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. // Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map.
// Time retention: so that we can delete old chunks with some time guarantee in low load environments. // Time retention: so that we can delete old chunks with some time guarantee in low load environments.
func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int, maxt int64) bool { func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int) bool {
return cdm.curFileNumBytes == 0 || // First head chunk file. return cdm.curFileNumBytes == 0 || // First head chunk file.
(maxt-cdm.curFileMint > cdm.maxFileTime && cdm.curFileNumBytes > HeadChunkFileHeaderSize) || // Time duration reached for the existing file.
cdm.curFileNumBytes+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size. cdm.curFileNumBytes+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
} }
func (cdm *ChunkDiskMapper) cut(mint int64) (returnErr error) { // CutNewFile creates a new m-mapped file.
func (cdm *ChunkDiskMapper) CutNewFile() (returnErr error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
return cdm.cut()
}
// cut creates a new m-mapped file. The write lock should be held before calling this.
func (cdm *ChunkDiskMapper) cut() (returnErr error) {
// Sync current tail to disk and close. // Sync current tail to disk and close.
if err := cdm.finalizeCurFile(); err != nil { if err := cdm.finalizeCurFile(); err != nil {
return err return err
@ -367,7 +354,6 @@ func (cdm *ChunkDiskMapper) cut(mint int64) (returnErr error) {
} }
cdm.curFileSequence = seq cdm.curFileSequence = seq
cdm.curFileMint = mint
cdm.curFile = newFile cdm.curFile = newFile
if cdm.chkWriter != nil { if cdm.chkWriter != nil {
cdm.chkWriter.Reset(newFile) cdm.chkWriter.Reset(newFile)
@ -690,7 +676,10 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error {
} }
cdm.readPathMtx.RUnlock() cdm.readPathMtx.RUnlock()
return cdm.deleteFiles(removedFiles) var merr tsdb_errors.MultiError
merr.Add(cdm.CutNewFile())
merr.Add(cdm.deleteFiles(removedFiles))
return merr.Err()
} }
func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error {

View file

@ -19,7 +19,6 @@ import (
"math/rand" "math/rand"
"os" "os"
"testing" "testing"
"time"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
@ -48,53 +47,58 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
totalChunks := 0 totalChunks := 0
var firstFileName string var firstFileName string
for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 { for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 {
for i := 0; i < 100; i++ { addChunks := func(numChunks int) {
seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw) for i := 0; i < numChunks; i++ {
totalChunks++ seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw)
expectedData = append(expectedData, expectedDataType{ totalChunks++
seriesRef: seriesRef, expectedData = append(expectedData, expectedDataType{
mint: mint, seriesRef: seriesRef,
maxt: maxt, mint: mint,
chunkRef: chkRef, maxt: maxt,
chunk: chunk, chunkRef: chkRef,
numSamples: uint16(chunk.NumSamples()), chunk: chunk,
}) numSamples: uint16(chunk.NumSamples()),
})
if hrw.curFileSequence != 1 { if hrw.curFileSequence != 1 {
// We are checking for bytes written only for the first file. // We are checking for bytes written only for the first file.
continue continue
}
// Calculating expected bytes written on disk for first file.
firstFileName = hrw.curFile.Name()
testutil.Equals(t, chunkRef(1, nextChunkOffset), chkRef)
bytesWritten := 0
chkCRC32.Reset()
binary.BigEndian.PutUint64(buf[bytesWritten:], seriesRef)
bytesWritten += SeriesRefSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint))
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
buf[bytesWritten] = byte(chunk.Encoding())
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
bytesWritten += n
expectedBytes = append(expectedBytes, buf[:bytesWritten]...)
_, err := chkCRC32.Write(buf[:bytesWritten])
testutil.Ok(t, err)
expectedBytes = append(expectedBytes, chunk.Bytes()...)
_, err = chkCRC32.Write(chunk.Bytes())
testutil.Ok(t, err)
expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...)
// += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC.
nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize
} }
// Calculating expected bytes written on disk for first file.
firstFileName = hrw.curFile.Name()
testutil.Equals(t, chunkRef(1, nextChunkOffset), chkRef)
bytesWritten := 0
chkCRC32.Reset()
binary.BigEndian.PutUint64(buf[bytesWritten:], seriesRef)
bytesWritten += SeriesRefSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint))
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
buf[bytesWritten] = byte(chunk.Encoding())
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
bytesWritten += n
expectedBytes = append(expectedBytes, buf[:bytesWritten]...)
_, err := chkCRC32.Write(buf[:bytesWritten])
testutil.Ok(t, err)
expectedBytes = append(expectedBytes, chunk.Bytes()...)
_, err = chkCRC32.Write(chunk.Bytes())
testutil.Ok(t, err)
expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...)
// += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC.
nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize
} }
addChunks(100)
hrw.CutNewFile()
addChunks(10) // For chunks in in-memory buffer.
} }
// Checking on-disk bytes for the first file. // Checking on-disk bytes for the first file.
@ -165,12 +169,12 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
timeRange := 0 timeRange := 0
fileTimeStep := 100 fileTimeStep := 100
totalFiles, after1stTruncation, after2ndTruncation := 7, 5, 3 totalFiles := 7
startIndexAfter1stTruncation, startIndexAfter2ndTruncation := 3, 6
filesDeletedAfter1stTruncation, filesDeletedAfter2ndTruncation := 2, 5
var timeToTruncate, timeToTruncateAfterRestart int64 var timeToTruncate, timeToTruncateAfterRestart int64
cutFile := func(i int) { addChunk := func() int {
testutil.Ok(t, hrw.cut(int64(timeRange)))
mint := timeRange + 1 // Just after the the new file cut. mint := timeRange + 1 // Just after the the new file cut.
maxt := timeRange + fileTimeStep - 1 // Just before the next file. maxt := timeRange + fileTimeStep - 1 // Just before the next file.
@ -178,15 +182,21 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t)) _, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
testutil.Ok(t, err) testutil.Ok(t, err)
if i == totalFiles-after1stTruncation+1 { timeRange += fileTimeStep
// Truncate the segment files before the 5th segment.
return mint
}
cutFile := func(i int) {
testutil.Ok(t, hrw.CutNewFile())
mint := addChunk()
if i == startIndexAfter1stTruncation {
timeToTruncate = int64(mint) timeToTruncate = int64(mint)
} else if i == totalFiles-after2ndTruncation+1 { } else if i == startIndexAfter2ndTruncation {
// Truncate the segment files before the 3rd segment after restart.
timeToTruncateAfterRestart = int64(mint) timeToTruncateAfterRestart = int64(mint)
} }
timeRange += fileTimeStep
} }
// Cut segments. // Cut segments.
@ -195,8 +205,8 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
} }
// Verifying the the remaining files. // Verifying the the remaining files.
verifyRemainingFiles := func(remainingFiles int) { verifyRemainingFiles := func(remainingFiles, startIndex int) {
t.Helper() //t.Helper()
files, err := ioutil.ReadDir(hrw.dir.Name()) files, err := ioutil.ReadDir(hrw.dir.Name())
testutil.Ok(t, err) testutil.Ok(t, err)
@ -206,7 +216,7 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
for i := 1; i <= totalFiles; i++ { for i := 1; i <= totalFiles; i++ {
_, ok := hrw.mmappedChunkFiles[i] _, ok := hrw.mmappedChunkFiles[i]
if i < totalFiles-remainingFiles+1 { if i < startIndex {
testutil.Equals(t, false, ok) testutil.Equals(t, false, ok)
} else { } else {
testutil.Equals(t, true, ok) testutil.Equals(t, true, ok)
@ -215,11 +225,13 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
} }
// Verify the number of segments. // Verify the number of segments.
verifyRemainingFiles(totalFiles) verifyRemainingFiles(totalFiles, 1)
// Truncating files. // Truncating files.
testutil.Ok(t, hrw.Truncate(timeToTruncate)) testutil.Ok(t, hrw.Truncate(timeToTruncate))
verifyRemainingFiles(after1stTruncation) totalFiles++ // Truncation creates a new file.
verifyRemainingFiles(totalFiles-filesDeletedAfter1stTruncation, startIndexAfter1stTruncation)
addChunk() // Add a chunk so that new file is not truncated.
dir := hrw.dir.Name() dir := hrw.dir.Name()
testutil.Ok(t, hrw.Close()) testutil.Ok(t, hrw.Close())
@ -235,14 +247,12 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
// Truncating files after restart. // Truncating files after restart.
testutil.Ok(t, hrw.Truncate(timeToTruncateAfterRestart)) testutil.Ok(t, hrw.Truncate(timeToTruncateAfterRestart))
verifyRemainingFiles(after2ndTruncation) totalFiles++ // Truncation creates a new file.
verifyRemainingFiles(totalFiles-filesDeletedAfter2ndTruncation, startIndexAfter2ndTruncation)
// Add another file to have an active file.
totalFiles++
cutFile(totalFiles)
// Truncating till current time should not delete the current active file. // Truncating till current time should not delete the current active file.
testutil.Ok(t, hrw.Truncate(time.Now().UnixNano()/1e6)) testutil.Ok(t, hrw.Truncate(int64(timeRange+fileTimeStep)))
verifyRemainingFiles(1) verifyRemainingFiles(2, totalFiles) // One file was the active file and one was newly created.
} }
func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) { func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) {

View file

@ -1300,7 +1300,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir)) testutil.Ok(t, os.RemoveAll(dir))
}() }()
const chunkRange = chunks.DefaultHeadChunkFileMaxTimeRange // to hold 4 chunks per segment. const chunkRange = 1000
walDir := filepath.Join(dir, "wal") walDir := filepath.Join(dir, "wal")
// Fill the chunk segments and corrupt it. // Fill the chunk segments and corrupt it.
@ -1317,19 +1317,20 @@ func TestHeadReadWriterRepair(t *testing.T) {
testutil.Assert(t, created, "series was not created") testutil.Assert(t, created, "series was not created")
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
ok, chunkCreated := s.append(int64(i*int(chunkRange)), float64(i*int(chunkRange)), 0, h.chunkDiskMapper) ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper)
testutil.Assert(t, ok, "series append failed") testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunk was not created") testutil.Assert(t, chunkCreated, "chunk was not created")
ok, chunkCreated = s.append(int64(i*int(chunkRange))+chunkRange-1, float64(i*int(chunkRange)), 0, h.chunkDiskMapper) ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper)
testutil.Assert(t, ok, "series append failed") testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunk was created") testutil.Assert(t, !chunkCreated, "chunk was created")
testutil.Ok(t, h.chunkDiskMapper.CutNewFile())
} }
testutil.Ok(t, h.Close()) testutil.Ok(t, h.Close())
// Verify that there are 6 segment files. // Verify that there are 7 segment files.
files, err := ioutil.ReadDir(mmappedChunksDir(dir)) files, err := ioutil.ReadDir(mmappedChunksDir(dir))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 6, len(files)) testutil.Equals(t, 7, len(files))
// Corrupt the 4th file by writing a random byte to series ref. // Corrupt the 4th file by writing a random byte to series ref.
f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0666) f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0666)