From bd50b04fedcfe73c2110d20599ac8a18d56cda6c Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Mon, 18 Apr 2022 17:35:45 +0000 Subject: [PATCH] remove old chunk disk mapper initialization and replace it with the upstream one Signed-off-by: Mauro Stettler --- tsdb/chunks/head_chunks.go | 6 +- tsdb/chunks/head_chunks_test.go | 23 +- tsdb/chunks/old_head_chunks.go | 716 ---------------------------- tsdb/chunks/old_head_chunks_test.go | 487 ------------------- tsdb/head.go | 38 +- tsdb/head_append.go | 6 +- tsdb/head_read.go | 10 +- tsdb/head_test.go | 2 +- 8 files changed, 28 insertions(+), 1260 deletions(-) delete mode 100644 tsdb/chunks/old_head_chunks.go delete mode 100644 tsdb/chunks/old_head_chunks_test.go diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 1bb7e14cb..297abc522 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -484,12 +484,11 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64 } // CutNewFile makes that a new file will be created the next time a chunk is written. -func (cdm *ChunkDiskMapper) CutNewFile() error { +func (cdm *ChunkDiskMapper) CutNewFile() { cdm.evtlPosMtx.Lock() defer cdm.evtlPosMtx.Unlock() cdm.evtlPos.cutFileOnNextChunk() - return nil } func (cdm *ChunkDiskMapper) IsQueueEmpty() bool { @@ -800,6 +799,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu if seriesRef == 0 && mint == 0 && maxt == 0 { break } + // Encoding. chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0]) idx += ChunkEncodingSize @@ -893,7 +893,7 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error { // There is a known race condition here because between the check of curFileSize() and the call to CutNewFile() // a new file could already be cut, this is acceptable because it will simply result in an empty file which // won't do any harm. - errs.Add(cdm.CutNewFile()) + cdm.CutNewFile() } pendingDeletes, err := cdm.deleteFiles(removedFiles) errs.Add(err) diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 2e650ab14..34c1ed9b9 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -116,7 +116,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { } } addChunks(100) - require.NoError(t, hrw.CutNewFile()) + hrw.CutNewFile() addChunks(10) // For chunks in in-memory buffer. } @@ -174,7 +174,7 @@ func TestChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) require.NoError(t, hrw.Close()) }() - ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw, nil) + ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw) // Checking on-disk bytes for the first file. require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) @@ -254,7 +254,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // Create segments 1 to 7. for i := 1; i <= 7; i++ { - require.NoError(t, hrw.CutNewFile()) + hrw.CutNewFile() mint := int64(addChunk()) if i == 3 { thirdFileMinT = mint @@ -453,7 +453,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { nonEmptyFile := func() { t.Helper() - require.NoError(t, hrw.CutNewFile()) + hrw.CutNewFile() addChunk() } @@ -555,24 +555,17 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer return } -func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper, hrwOld *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { +func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { var err error seriesRef = HeadSeriesRef(rand.Int63()) mint = int64((idx)*1000 + 1) maxt = int64((idx + 1) * 1000) chunk = randomUnsupportedChunk(t) awaitCb := make(chan struct{}) - if hrw != nil { - chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { - require.NoError(t, err) - close(awaitCb) - }) - } else { - chunkRef = hrwOld.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { - require.NoError(t, err) - }) + chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { + require.NoError(t, err) close(awaitCb) - } + }) <-awaitCb return } diff --git a/tsdb/chunks/old_head_chunks.go b/tsdb/chunks/old_head_chunks.go deleted file mode 100644 index 5301fd1ac..000000000 --- a/tsdb/chunks/old_head_chunks.go +++ /dev/null @@ -1,716 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package chunks - -import ( - "bufio" - "bytes" - "encoding/binary" - "hash" - "io" - "os" - "sort" - "sync" - - "github.com/pkg/errors" - "go.uber.org/atomic" - - "github.com/prometheus/prometheus/tsdb/chunkenc" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" - "github.com/prometheus/prometheus/tsdb/fileutil" -) - -// OldChunkDiskMapper is for writing the Head block chunks to the disk -// and access chunks via mmapped file. -type OldChunkDiskMapper struct { - curFileNumBytes atomic.Int64 // Bytes written in current open file. - - /// Writer. - dir *os.File - writeBufferSize int - - curFile *os.File // File being written to. - curFileSequence int // Index of current open file being appended to. - curFileMaxt int64 // Used for the size retention. - - byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk. - chkWriter *bufio.Writer // Writer for the current open file. - crc32 hash.Hash - writePathMtx sync.Mutex - - /// Reader. - // The int key in the map is the file number on the disk. - mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index. - closers map[int]io.Closer // Closers for resources behind the byte slices. - readPathMtx sync.RWMutex // Mutex used to protect the above 2 maps. - pool chunkenc.Pool // This is used when fetching a chunk from the disk to allocate a chunk. - - // Writer and Reader. - // We flush chunks to disk in batches. Hence, we store them in this buffer - // from which chunks are served till they are flushed and are ready for m-mapping. - chunkBuffer *chunkBuffer - - // Whether the maxt field is set for all mmapped chunk files tracked within the mmappedChunkFiles map. - // This is done after iterating through all the chunks in those files using the IterateAllChunks method. - fileMaxtSet bool - - closed bool -} - -// NewOldChunkDiskMapper returns a new ChunkDiskMapper against the given directory -// using the default head chunk file duration. -// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper -// to set the maxt of all the file. -func NewOldChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*OldChunkDiskMapper, error) { - // Validate write buffer size. - if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize { - return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize) - } - if writeBufferSize%1024 != 0 { - return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize) - } - - if err := os.MkdirAll(dir, 0o777); err != nil { - return nil, err - } - dirFile, err := fileutil.OpenDir(dir) - if err != nil { - return nil, err - } - - m := &OldChunkDiskMapper{ - dir: dirFile, - pool: pool, - writeBufferSize: writeBufferSize, - crc32: newCRC32(), - chunkBuffer: newChunkBuffer(), - } - - if m.pool == nil { - m.pool = chunkenc.NewPool() - } - - return m, m.openMMapFiles() -} - -// openMMapFiles opens all files within dir for mmapping. -func (cdm *OldChunkDiskMapper) openMMapFiles() (returnErr error) { - cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} - cdm.closers = map[int]io.Closer{} - defer func() { - if returnErr != nil { - returnErr = tsdb_errors.NewMulti(returnErr, closeAllFromMap(cdm.closers)).Err() - - cdm.mmappedChunkFiles = nil - cdm.closers = nil - } - }() - - files, err := listChunkFiles(cdm.dir.Name()) - if err != nil { - return err - } - - files, err = repairLastChunkFile(files) - if err != nil { - return err - } - - chkFileIndices := make([]int, 0, len(files)) - for seq, fn := range files { - f, err := fileutil.OpenMmapFile(fn) - if err != nil { - return errors.Wrapf(err, "mmap files, file: %s", fn) - } - cdm.closers[seq] = f - cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())} - chkFileIndices = append(chkFileIndices, seq) - } - - // Check for gaps in the files. - sort.Ints(chkFileIndices) - if len(chkFileIndices) == 0 { - return nil - } - lastSeq := chkFileIndices[0] - for _, seq := range chkFileIndices[1:] { - if seq != lastSeq+1 { - return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq) - } - lastSeq = seq - } - - for i, b := range cdm.mmappedChunkFiles { - if b.byteSlice.Len() < HeadChunkFileHeaderSize { - return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i]) - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks { - return errors.Errorf("%s: invalid magic number %x", files[i], m) - } - - // Verify chunk format version. - if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { - return errors.Errorf("%s: invalid chunk format version %d", files[i], v) - } - } - - return nil -} - -// WriteChunk writes the chunk to the disk. -// The returned chunk ref is the reference from where the chunk encoding starts for the chunk. -func (cdm *OldChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { - chkRef, err := func() (ChunkDiskMapperRef, error) { - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - - if cdm.closed { - return 0, ErrChunkDiskMapperClosed - } - - if cdm.shouldCutNewFile(len(chk.Bytes())) { - if err := cdm.cut(); err != nil { - return 0, err - } - } - - // if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size; - // so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer). - if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) { - if err := cdm.flushBuffer(); err != nil { - return 0, err - } - } - - cdm.crc32.Reset() - bytesWritten := 0 - - chkRef = newChunkDiskMapperRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize())) - - binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(seriesRef)) - bytesWritten += SeriesRefSize - binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint)) - bytesWritten += MintMaxtSize - binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt)) - bytesWritten += MintMaxtSize - cdm.byteBuf[bytesWritten] = byte(chk.Encoding()) - bytesWritten += ChunkEncodingSize - n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes()))) - bytesWritten += n - - if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil { - return 0, err - } - if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil { - return 0, err - } - if err := cdm.writeCRC32(); err != nil { - return 0, err - } - - if maxt > cdm.curFileMaxt { - cdm.curFileMaxt = maxt - } - - cdm.chunkBuffer.put(chkRef, chk) - - if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize { - // The chunk was bigger than the buffer itself. - // Flushing to not keep partial chunks in buffer. - if err := cdm.flushBuffer(); err != nil { - return 0, err - } - } - - return chkRef, nil - }() - - if err != nil && callback != nil { - callback(err) - } - - return chkRef -} - -// shouldCutNewFile returns whether a new file should be cut, based on time and size retention. -// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. -// Time retention: so that we can delete old chunks with some time guarantee in low load environments. -func (cdm *OldChunkDiskMapper) shouldCutNewFile(chunkSize int) bool { - return cdm.curFileSize() == 0 || // First head chunk file. - cdm.curFileSize()+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size. -} - -// CutNewFile creates a new m-mapped file. -func (cdm *OldChunkDiskMapper) CutNewFile() (returnErr error) { - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - - return cdm.cut() -} - -// cut creates a new m-mapped file. The write lock should be held before calling this. -func (cdm *OldChunkDiskMapper) cut() (returnErr error) { - // Sync current tail to disk and close. - if err := cdm.finalizeCurFile(); err != nil { - return err - } - - n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize) - if err != nil { - return err - } - defer func() { - // The file should not be closed if there is no error, - // its kept open in the ChunkDiskMapper. - if returnErr != nil { - returnErr = tsdb_errors.NewMulti(returnErr, newFile.Close()).Err() - } - }() - - cdm.curFileNumBytes.Store(int64(n)) - - if cdm.curFile != nil { - cdm.readPathMtx.Lock() - cdm.mmappedChunkFiles[cdm.curFileSequence].maxt = cdm.curFileMaxt - cdm.readPathMtx.Unlock() - } - - mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize) - if err != nil { - return err - } - - cdm.readPathMtx.Lock() - cdm.curFileSequence = seq - cdm.curFile = newFile - if cdm.chkWriter != nil { - cdm.chkWriter.Reset(newFile) - } else { - cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize) - } - - cdm.closers[cdm.curFileSequence] = mmapFile - cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())} - cdm.readPathMtx.Unlock() - - cdm.curFileMaxt = 0 - - return nil -} - -// finalizeCurFile writes all pending data to the current tail file, -// truncates its size, and closes it. -func (cdm *OldChunkDiskMapper) finalizeCurFile() error { - if cdm.curFile == nil { - return nil - } - - if err := cdm.flushBuffer(); err != nil { - return err - } - - if err := cdm.curFile.Sync(); err != nil { - return err - } - - return cdm.curFile.Close() -} - -func (cdm *OldChunkDiskMapper) write(b []byte) error { - n, err := cdm.chkWriter.Write(b) - cdm.curFileNumBytes.Add(int64(n)) - return err -} - -func (cdm *OldChunkDiskMapper) writeAndAppendToCRC32(b []byte) error { - if err := cdm.write(b); err != nil { - return err - } - _, err := cdm.crc32.Write(b) - return err -} - -func (cdm *OldChunkDiskMapper) writeCRC32() error { - return cdm.write(cdm.crc32.Sum(cdm.byteBuf[:0])) -} - -// flushBuffer flushes the current in-memory chunks. -// Assumes that writePathMtx is _write_ locked before calling this method. -func (cdm *OldChunkDiskMapper) flushBuffer() error { - if err := cdm.chkWriter.Flush(); err != nil { - return err - } - cdm.chunkBuffer.clear() - return nil -} - -// Chunk returns a chunk from a given reference. -func (cdm *OldChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) { - cdm.readPathMtx.RLock() - // We hold this read lock for the entire duration because if Close() - // is called, the data in the byte slice will get corrupted as the mmapped - // file will be closed. - defer cdm.readPathMtx.RUnlock() - - if cdm.closed { - return nil, ErrChunkDiskMapperClosed - } - - sgmIndex, chkStart := ref.Unpack() - // We skip the series ref and the mint/maxt beforehand. - chkStart += SeriesRefSize + (2 * MintMaxtSize) - chkCRC32 := newCRC32() - - // If it is the current open file, then the chunks can be in the buffer too. - if sgmIndex == cdm.curFileSequence { - chunk := cdm.chunkBuffer.get(ref) - if chunk != nil { - return chunk, nil - } - } - - mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex] - if !ok { - if sgmIndex > cdm.curFileSequence { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: -1, - Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex), - } - } - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.New("head chunk file index %d does not exist on disk"), - } - } - - if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()), - } - } - - // Encoding. - chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0] - - // Data length. - // With the minimum chunk length this should never cause us reading - // over the end of the slice. - chkDataLenStart := chkStart + ChunkEncodingSize - c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize) - chkDataLen, n := binary.Uvarint(c) - if n <= 0 { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("reading chunk length failed with %d", n), - } - } - - // Verify the chunk data end. - chkDataEnd := chkDataLenStart + n + int(chkDataLen) - if chkDataEnd > mmapFile.byteSlice.Len() { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()), - } - } - - // Check the CRC. - sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize) - if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: err, - } - } - if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), - } - } - - // The chunk data itself. - chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd) - - // Make a copy of the chunk data to prevent a panic occurring because the returned - // chunk data slice references an mmap-ed file which could be closed after the - // function returns but while the chunk is still in use. - chkDataCopy := make([]byte, len(chkData)) - copy(chkDataCopy, chkData) - - chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkDataCopy) - if err != nil { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: err, - } - } - return chk, nil -} - -// IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it -// and runs the provided function with information about each chunk. It returns on the first error encountered. -// NOTE: This method needs to be called at least once after creating ChunkDiskMapper -// to set the maxt of all the file. -func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) { - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - - defer func() { - cdm.fileMaxtSet = true - }() - - chkCRC32 := newCRC32() - - // Iterate files in ascending order. - segIDs := make([]int, 0, len(cdm.mmappedChunkFiles)) - for seg := range cdm.mmappedChunkFiles { - segIDs = append(segIDs, seg) - } - sort.Ints(segIDs) - for _, segID := range segIDs { - mmapFile := cdm.mmappedChunkFiles[segID] - fileEnd := mmapFile.byteSlice.Len() - if segID == cdm.curFileSequence { - fileEnd = int(cdm.curFileSize()) - } - idx := HeadChunkFileHeaderSize - for idx < fileEnd { - if fileEnd-idx < MaxHeadChunkMetaSize { - // Check for all 0s which marks the end of the file. - allZeros := true - for _, b := range mmapFile.byteSlice.Range(idx, fileEnd) { - if b != byte(0) { - allZeros = false - break - } - } - if allZeros { - // End of segment chunk file content. - break - } - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+ - " - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID), - } - } - chkCRC32.Reset() - chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx)) - - startIdx := idx - seriesRef := HeadSeriesRef(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize))) - idx += SeriesRefSize - mint := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize))) - idx += MintMaxtSize - maxt := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize))) - idx += MintMaxtSize - - // We preallocate file to help with m-mapping (especially windows systems). - // As series ref always starts from 1, we assume it being 0 to be the end of the actual file data. - // We are not considering possible file corruption that can cause it to be 0. - // Additionally we are checking mint and maxt just to be sure. - if seriesRef == 0 && mint == 0 && maxt == 0 { - break - } - - // Encoding. - chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0]) - idx += ChunkEncodingSize - dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) - idx += n - - numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2)) - idx += int(dataLen) // Skip the data. - - // In the beginning we only checked for the chunk meta size. - // Now that we have added the chunk data length, we check for sufficient bytes again. - if idx+CRCSize > fileEnd { - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID), - } - } - - // Check CRC. - sum := mmapFile.byteSlice.Range(idx, idx+CRCSize) - if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil { - return err - } - if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), - } - } - idx += CRCSize - - if maxt > mmapFile.maxt { - mmapFile.maxt = maxt - } - - if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil { - if cerr, ok := err.(*CorruptionErr); ok { - cerr.Dir = cdm.dir.Name() - cerr.FileIndex = segID - return cerr - } - return err - } - } - - if idx > fileEnd { - // It should be equal to the slice length. - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID), - } - } - } - - return nil -} - -// Truncate deletes the head chunk files which are strictly below the mint. -// mint should be in milliseconds. -func (cdm *OldChunkDiskMapper) Truncate(mint int64) error { - if !cdm.fileMaxtSet { - return errors.New("maxt of the files are not set") - } - cdm.readPathMtx.RLock() - - // Sort the file indices, else if files deletion fails in between, - // it can lead to unsequential files as the map is not sorted. - chkFileIndices := make([]int, 0, len(cdm.mmappedChunkFiles)) - for seq := range cdm.mmappedChunkFiles { - chkFileIndices = append(chkFileIndices, seq) - } - sort.Ints(chkFileIndices) - - var removedFiles []int - for _, seq := range chkFileIndices { - if seq == cdm.curFileSequence || cdm.mmappedChunkFiles[seq].maxt >= mint { - break - } - if cdm.mmappedChunkFiles[seq].maxt < mint { - removedFiles = append(removedFiles, seq) - } - } - cdm.readPathMtx.RUnlock() - - errs := tsdb_errors.NewMulti() - // Cut a new file only if the current file has some chunks. - if cdm.curFileSize() > HeadChunkFileHeaderSize { - errs.Add(cdm.CutNewFile()) - } - errs.Add(cdm.deleteFiles(removedFiles)) - return errs.Err() -} - -func (cdm *OldChunkDiskMapper) deleteFiles(removedFiles []int) error { - cdm.readPathMtx.Lock() - for _, seq := range removedFiles { - if err := cdm.closers[seq].Close(); err != nil { - cdm.readPathMtx.Unlock() - return err - } - delete(cdm.mmappedChunkFiles, seq) - delete(cdm.closers, seq) - } - cdm.readPathMtx.Unlock() - - // We actually delete the files separately to not block the readPathMtx for long. - for _, seq := range removedFiles { - if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil { - return err - } - } - - return nil -} - -// DeleteCorrupted deletes all the head chunk files after the one which had the corruption -// (including the corrupt file). -func (cdm *OldChunkDiskMapper) DeleteCorrupted(originalErr error) error { - err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped. - cerr, ok := err.(*CorruptionErr) - if !ok { - return errors.Wrap(originalErr, "cannot handle error") - } - - // Delete all the head chunk files following the corrupt head chunk file. - segs := []int{} - cdm.readPathMtx.RLock() - for seg := range cdm.mmappedChunkFiles { - if seg >= cerr.FileIndex { - segs = append(segs, seg) - } - } - cdm.readPathMtx.RUnlock() - - return cdm.deleteFiles(segs) -} - -// Size returns the size of the chunk files. -func (cdm *OldChunkDiskMapper) Size() (int64, error) { - return fileutil.DirSize(cdm.dir.Name()) -} - -func (cdm *OldChunkDiskMapper) curFileSize() int64 { - return cdm.curFileNumBytes.Load() -} - -// Close closes all the open files in ChunkDiskMapper. -// It is not longer safe to access chunks from this struct after calling Close. -func (cdm *OldChunkDiskMapper) Close() error { - // 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file. - // The lock order should not be reversed here else it can cause deadlocks. - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - cdm.readPathMtx.Lock() - defer cdm.readPathMtx.Unlock() - - if cdm.closed { - return nil - } - cdm.closed = true - - errs := tsdb_errors.NewMulti( - closeAllFromMap(cdm.closers), - cdm.finalizeCurFile(), - cdm.dir.Close(), - ) - cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} - cdm.closers = map[int]io.Closer{} - - return errs.Err() -} - -func (cdm *OldChunkDiskMapper) IsQueueEmpty() bool { - return true // there is no queue -} diff --git a/tsdb/chunks/old_head_chunks_test.go b/tsdb/chunks/old_head_chunks_test.go deleted file mode 100644 index d81ef3df2..000000000 --- a/tsdb/chunks/old_head_chunks_test.go +++ /dev/null @@ -1,487 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package chunks - -import ( - "encoding/binary" - "errors" - "io/ioutil" - "math/rand" - "os" - "strconv" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/prometheus/prometheus/tsdb/chunkenc" -) - -func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - expectedBytes := []byte{} - nextChunkOffset := uint64(HeadChunkFileHeaderSize) - chkCRC32 := newCRC32() - - type expectedDataType struct { - seriesRef HeadSeriesRef - chunkRef ChunkDiskMapperRef - mint, maxt int64 - numSamples uint16 - chunk chunkenc.Chunk - } - expectedData := []expectedDataType{} - - var buf [MaxHeadChunkMetaSize]byte - totalChunks := 0 - var firstFileName string - for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 { - addChunks := func(numChunks int) { - for i := 0; i < numChunks; i++ { - seriesRef, chkRef, mint, maxt, chunk := createChunkForOld(t, totalChunks, hrw) - totalChunks++ - expectedData = append(expectedData, expectedDataType{ - seriesRef: seriesRef, - mint: mint, - maxt: maxt, - chunkRef: chkRef, - chunk: chunk, - numSamples: uint16(chunk.NumSamples()), - }) - - if hrw.curFileSequence != 1 { - // We are checking for bytes written only for the first file. - continue - } - - // Calculating expected bytes written on disk for first file. - firstFileName = hrw.curFile.Name() - require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef) - - bytesWritten := 0 - chkCRC32.Reset() - - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef)) - bytesWritten += SeriesRefSize - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint)) - bytesWritten += MintMaxtSize - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt)) - bytesWritten += MintMaxtSize - buf[bytesWritten] = byte(chunk.Encoding()) - bytesWritten += ChunkEncodingSize - n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes()))) - bytesWritten += n - - expectedBytes = append(expectedBytes, buf[:bytesWritten]...) - _, err := chkCRC32.Write(buf[:bytesWritten]) - require.NoError(t, err) - expectedBytes = append(expectedBytes, chunk.Bytes()...) - _, err = chkCRC32.Write(chunk.Bytes()) - require.NoError(t, err) - - expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...) - - // += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC. - nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize - } - } - addChunks(100) - hrw.CutNewFile() - addChunks(10) // For chunks in in-memory buffer. - } - - // Checking on-disk bytes for the first file. - require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles)) - require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) - - actualBytes, err := ioutil.ReadFile(firstFileName) - require.NoError(t, err) - - // Check header of the segment file. - require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize]))) - require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize])) - - // Remaining chunk data. - fileEnd := HeadChunkFileHeaderSize + len(expectedBytes) - require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd]) - - // Testing reading of chunks. - for _, exp := range expectedData { - actChunk, err := hrw.Chunk(exp.chunkRef) - require.NoError(t, err) - require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes()) - } - - // Testing IterateAllChunks method. - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - idx := 0 - require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { - t.Helper() - - expData := expectedData[idx] - require.Equal(t, expData.seriesRef, seriesRef) - require.Equal(t, expData.chunkRef, chunkRef) - require.Equal(t, expData.maxt, maxt) - require.Equal(t, expData.maxt, maxt) - require.Equal(t, expData.numSamples, numSamples) - - actChunk, err := hrw.Chunk(expData.chunkRef) - require.NoError(t, err) - require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes()) - - idx++ - return nil - })) - require.Equal(t, len(expectedData), idx) -} - -func TestOldChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, nil, hrw) - - // Checking on-disk bytes for the first file. - require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) - require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) - - // Testing IterateAllChunks method. - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { - t.Helper() - - require.Equal(t, ucSeriesRef, seriesRef) - require.Equal(t, ucChkRef, chunkRef) - require.Equal(t, ucMint, mint) - require.Equal(t, ucMaxt, maxt) - require.Equal(t, uchunk.Encoding(), encoding) // Asserts that the encoding is EncUnsupportedXOR - - actChunk, err := hrw.Chunk(chunkRef) - // The chunk encoding is unknown so Chunk() should fail but us the caller - // are ok with that. Above we asserted that the encoding we expected was - // EncUnsupportedXOR - require.NotNil(t, err) - require.Contains(t, err.Error(), "invalid chunk encoding \"\"") - require.Nil(t, actChunk) - - return nil - })) -} - -// TestOldChunkDiskMapper_Truncate tests -// * If truncation is happening properly based on the time passed. -// * The active file is not deleted even if the passed time makes it eligible to be deleted. -// * Empty current file does not lead to creation of another file after truncation. -// * Non-empty current file leads to creation of another file after truncation. -func TestOldChunkDiskMapper_Truncate(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - timeRange := 0 - fileTimeStep := 100 - var thirdFileMinT, sixthFileMinT int64 - - addChunk := func() int { - mint := timeRange + 1 // Just after the new file cut. - maxt := timeRange + fileTimeStep - 1 // Just before the next file. - - // Write a chunks to set maxt for the segment. - _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - require.NoError(t, err) - }) - - timeRange += fileTimeStep - - return mint - } - - verifyFiles := func(remainingFiles []int) { - t.Helper() - - files, err := ioutil.ReadDir(hrw.dir.Name()) - require.NoError(t, err) - require.Equal(t, len(remainingFiles), len(files), "files on disk") - require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") - require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") - - for _, i := range remainingFiles { - _, ok := hrw.mmappedChunkFiles[i] - require.Equal(t, true, ok) - } - } - - // Create segments 1 to 7. - for i := 1; i <= 7; i++ { - require.NoError(t, hrw.CutNewFile()) - mint := int64(addChunk()) - if i == 3 { - thirdFileMinT = mint - } else if i == 6 { - sixthFileMinT = mint - } - } - verifyFiles([]int{1, 2, 3, 4, 5, 6, 7}) - - // Truncating files. - require.NoError(t, hrw.Truncate(thirdFileMinT)) - verifyFiles([]int{3, 4, 5, 6, 7, 8}) - - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Restarted. - var err error - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { - return nil - })) - require.True(t, hrw.fileMaxtSet) - - verifyFiles([]int{3, 4, 5, 6, 7, 8}) - // New file is created after restart even if last file was empty. - addChunk() - verifyFiles([]int{3, 4, 5, 6, 7, 8, 9}) - - // Truncating files after restart. - require.NoError(t, hrw.Truncate(sixthFileMinT)) - verifyFiles([]int{6, 7, 8, 9, 10}) - - // As the last file was empty, this creates no new files. - require.NoError(t, hrw.Truncate(sixthFileMinT+1)) - verifyFiles([]int{6, 7, 8, 9, 10}) - addChunk() - - // Truncating till current time should not delete the current active file. - require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep)))) - verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created. -} - -// TestOldChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke -// holes into the file sequence, even if there are empty files in between non-empty files. -// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation -// simply deleted all empty files instead of stopping once it encountered a non-empty file. -func TestOldChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - timeRange := 0 - addChunk := func() { - step := 100 - mint, maxt := timeRange+1, timeRange+step-1 - _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - require.NoError(t, err) - }) - timeRange += step - } - emptyFile := func() { - require.NoError(t, hrw.CutNewFile()) - } - nonEmptyFile := func() { - emptyFile() - addChunk() - } - - addChunk() // 1. Created with the first chunk. - nonEmptyFile() // 2. - nonEmptyFile() // 3. - emptyFile() // 4. - nonEmptyFile() // 5. - emptyFile() // 6. - - verifyFiles := func(remainingFiles []int) { - t.Helper() - - files, err := ioutil.ReadDir(hrw.dir.Name()) - require.NoError(t, err) - require.Equal(t, len(remainingFiles), len(files), "files on disk") - require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") - require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") - - for _, i := range remainingFiles { - _, ok := hrw.mmappedChunkFiles[i] - require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i) - } - } - - verifyFiles([]int{1, 2, 3, 4, 5, 6}) - - // Truncating files till 2. It should not delete anything after 3 (inclusive) - // though files 4 and 6 are empty. - file2Maxt := hrw.mmappedChunkFiles[2].maxt - require.NoError(t, hrw.Truncate(file2Maxt+1)) - // As 6 was empty, it should not create another file. - verifyFiles([]int{3, 4, 5, 6}) - - addChunk() - // Truncate creates another file as 6 is not empty now. - require.NoError(t, hrw.Truncate(file2Maxt+1)) - verifyFiles([]int{3, 4, 5, 6, 7}) - - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Restarting checks for unsequential files. - var err error - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - verifyFiles([]int{3, 4, 5, 6, 7}) -} - -// TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks tests for -// https://github.com/prometheus/prometheus/issues/7753 -func TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - // Write a chunks to iterate on it later. - _ = hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(err error) { - require.NoError(t, err) - }) - - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Restarting to recreate https://github.com/prometheus/prometheus/issues/7753. - hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - // Forcefully failing IterateAllChunks. - require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { - return errors.New("random error") - })) - - // Truncation call should not return error after IterateAllChunks fails. - require.NoError(t, hrw.Truncate(2000)) -} - -func TestOldChunkDiskMapper_ReadRepairOnEmptyLastFile(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - timeRange := 0 - addChunk := func() { - step := 100 - mint, maxt := timeRange+1, timeRange+step-1 - _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - require.NoError(t, err) - }) - timeRange += step - } - nonEmptyFile := func() { - require.NoError(t, hrw.CutNewFile()) - addChunk() - } - - addChunk() // 1. Created with the first chunk. - nonEmptyFile() // 2. - nonEmptyFile() // 3. - - require.Equal(t, 3, len(hrw.mmappedChunkFiles)) - lastFile := 0 - for idx := range hrw.mmappedChunkFiles { - if idx > lastFile { - lastFile = idx - } - } - require.Equal(t, 3, lastFile) - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Write an empty last file mimicking an abrupt shutdown on file creation. - emptyFileName := segmentFile(dir, lastFile+1) - f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0o666) - require.NoError(t, err) - require.NoError(t, f.Sync()) - stat, err := f.Stat() - require.NoError(t, err) - require.Equal(t, int64(0), stat.Size()) - require.NoError(t, f.Close()) - - // Open chunk disk mapper again, corrupt file should be removed. - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { - return nil - })) - require.True(t, hrw.fileMaxtSet) - - // Removed from memory. - require.Equal(t, 3, len(hrw.mmappedChunkFiles)) - for idx := range hrw.mmappedChunkFiles { - require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file") - } - - // Removed even from disk. - files, err := ioutil.ReadDir(dir) - require.NoError(t, err) - require.Equal(t, 3, len(files)) - for _, fi := range files { - seq, err := strconv.ParseUint(fi.Name(), 10, 64) - require.NoError(t, err) - require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file") - } -} - -func testOldChunkDiskMapper(t *testing.T) *OldChunkDiskMapper { - tmpdir, err := ioutil.TempDir("", "data") - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, os.RemoveAll(tmpdir)) - }) - - hrw, err := NewOldChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { - return nil - })) - require.True(t, hrw.fileMaxtSet) - return hrw -} - -func createChunkForOld(t *testing.T, idx int, hrw *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { - seriesRef = HeadSeriesRef(rand.Int63()) - mint = int64((idx)*1000 + 1) - maxt = int64((idx + 1) * 1000) - chunk = randomChunk(t) - chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(err error) { - require.NoError(t, err) - }) - return -} diff --git a/tsdb/head.go b/tsdb/head.go index c8b4ee85e..87824a581 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -57,20 +57,6 @@ var ( defaultIsolationDisabled = false ) -// chunkDiskMapper is a temporary interface while we transition from -// 0 size queue to queue based chunk disk mapper. -type chunkDiskMapper interface { - CutNewFile() (returnErr error) - IterateAllChunks(f func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) - Truncate(mint int64) error - DeleteCorrupted(originalErr error) error - Size() (int64, error) - Close() error - Chunk(ref chunks.ChunkDiskMapperRef) (chunkenc.Chunk, error) - WriteChunk(seriesRef chunks.HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef chunks.ChunkDiskMapperRef) - IsQueueEmpty() bool -} - // Head handles reads and writes of time series data within a time window. type Head struct { chunkRange atomic.Int64 @@ -111,7 +97,7 @@ type Head struct { lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. // chunkDiskMapper is used to write and read Head chunks to/from disk. - chunkDiskMapper chunkDiskMapper + chunkDiskMapper *chunks.ChunkDiskMapper chunkSnapshotMtx sync.Mutex @@ -229,21 +215,13 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti opts.ChunkPool = chunkenc.NewPool() } - if opts.ChunkWriteQueueSize > 0 { - h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( - r, - mmappedChunksDir(opts.ChunkDirRoot), - opts.ChunkPool, - opts.ChunkWriteBufferSize, - opts.ChunkWriteQueueSize, - ) - } else { - h.chunkDiskMapper, err = chunks.NewOldChunkDiskMapper( - mmappedChunksDir(opts.ChunkDirRoot), - opts.ChunkPool, - opts.ChunkWriteBufferSize, - ) - } + h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( + r, + mmappedChunksDir(opts.ChunkDirRoot), + opts.ChunkPool, + opts.ChunkWriteBufferSize, + opts.ChunkWriteQueueSize, + ) if err != nil { return nil, err } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 3232ed61e..efef15560 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -489,7 +489,7 @@ func (a *headAppender) Commit() (err error) { // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper chunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) { +func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) { // Based on Gorilla white papers this offers near-optimal compression ratio // so anything bigger that this has diminishing returns and increases // the time range within which we have to decompress all samples. @@ -587,7 +587,7 @@ func addJitterToChunkEndTime(seriesHash uint64, chunkMinTime, nextAt, maxNextAt return min(maxNextAt, nextAt+chunkDurationVariance-(chunkDurationMaxVariance/2)) } -func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper chunkDiskMapper) *memChunk { +func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { s.mmapCurrentHeadChunk(chunkDiskMapper) s.headChunk = &memChunk{ @@ -608,7 +608,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper chunkDiskMapper) return s.headChunk } -func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper chunkDiskMapper) { +func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) { if s.headChunk == nil { // There is no head chunk, so nothing to m-map here. return diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 0209a6a15..2d37f2a7e 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -329,7 +329,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) { // chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk. // If garbageCollect is true, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. -func (s *memSeries) chunk(id chunks.HeadChunkID, cdm chunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { +func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix @@ -344,7 +344,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, cdm chunkDiskMapper) (chunk *me } return s.headChunk, false, nil } - chk, err := cdm.Chunk(s.mmappedChunks[ix].ref) + chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) if err != nil { if _, ok := err.(*chunks.CorruptionErr); ok { panic(err) @@ -363,7 +363,7 @@ type safeChunk struct { s *memSeries cid chunks.HeadChunkID isoState *isolationState - chunkDiskMapper chunkDiskMapper + chunkDiskMapper *chunks.ChunkDiskMapper } func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { @@ -375,8 +375,8 @@ func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { // iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range. // It is unsafe to call this concurrently with s.append(...) without holding the series lock. -func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, cdm chunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { - c, garbageCollect, err := s.chunk(id, cdm) +func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { + c, garbageCollect, err := s.chunk(id, chunkDiskMapper) // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a // series's chunk, which got then garbage collected before it got // accessed. We must ensure to not garbage collect as long as any diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 997eec2a6..0e9c9c028 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1634,7 +1634,7 @@ func TestHeadReadWriterRepair(t *testing.T) { _, ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") - require.NoError(t, h.chunkDiskMapper.CutNewFile()) + h.chunkDiskMapper.CutNewFile() } require.NoError(t, h.Close())