diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 4fe40e9b25..984e7a0811 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -176,7 +176,7 @@ func TestChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) require.NoError(t, hrw.Close()) }() - ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw, nil) + ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw) // Checking on-disk bytes for the first file. require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) @@ -554,24 +554,17 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer return } -func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper, hrwOld *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { +func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { var err error seriesRef = HeadSeriesRef(rand.Int63()) mint = int64((idx)*1000 + 1) maxt = int64((idx + 1) * 1000) chunk = randomUnsupportedChunk(t) awaitCb := make(chan struct{}) - if hrw != nil { - chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { - require.NoError(t, err) - close(awaitCb) - }) - } else { - chunkRef = hrwOld.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { - require.NoError(t, err) - }) + chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { + require.NoError(t, err) close(awaitCb) - } + }) <-awaitCb return } diff --git a/tsdb/chunks/old_head_chunks.go b/tsdb/chunks/old_head_chunks.go deleted file mode 100644 index 1722ddd9a3..0000000000 --- a/tsdb/chunks/old_head_chunks.go +++ /dev/null @@ -1,713 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package chunks - -import ( - "bufio" - "bytes" - "encoding/binary" - "hash" - "io" - "os" - "sort" - "sync" - - "github.com/pkg/errors" - "go.uber.org/atomic" - - "github.com/prometheus/prometheus/tsdb/chunkenc" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" - "github.com/prometheus/prometheus/tsdb/fileutil" -) - -// OldChunkDiskMapper is for writing the Head block chunks to the disk -// and access chunks via mmapped file. -type OldChunkDiskMapper struct { - curFileNumBytes atomic.Int64 // Bytes written in current open file. - - /// Writer. - dir *os.File - writeBufferSize int - - curFile *os.File // File being written to. - curFileSequence int // Index of current open file being appended to. - curFileMaxt int64 // Used for the size retention. - - byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk. - chkWriter *bufio.Writer // Writer for the current open file. - crc32 hash.Hash - writePathMtx sync.Mutex - - /// Reader. - // The int key in the map is the file number on the disk. - mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index. - closers map[int]io.Closer // Closers for resources behind the byte slices. - readPathMtx sync.RWMutex // Mutex used to protect the above 2 maps. - pool chunkenc.Pool // This is used when fetching a chunk from the disk to allocate a chunk. - - // Writer and Reader. - // We flush chunks to disk in batches. Hence, we store them in this buffer - // from which chunks are served till they are flushed and are ready for m-mapping. - chunkBuffer *chunkBuffer - - // Whether the maxt field is set for all mmapped chunk files tracked within the mmappedChunkFiles map. - // This is done after iterating through all the chunks in those files using the IterateAllChunks method. - fileMaxtSet bool - - closed bool -} - -// NewOldChunkDiskMapper returns a new ChunkDiskMapper against the given directory -// using the default head chunk file duration. -// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper -// to set the maxt of all the file. -func NewOldChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*OldChunkDiskMapper, error) { - // Validate write buffer size. - if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize { - return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize) - } - if writeBufferSize%1024 != 0 { - return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize) - } - - if err := os.MkdirAll(dir, 0o777); err != nil { - return nil, err - } - dirFile, err := fileutil.OpenDir(dir) - if err != nil { - return nil, err - } - - m := &OldChunkDiskMapper{ - dir: dirFile, - pool: pool, - writeBufferSize: writeBufferSize, - crc32: newCRC32(), - chunkBuffer: newChunkBuffer(), - } - - if m.pool == nil { - m.pool = chunkenc.NewPool() - } - - return m, m.openMMapFiles() -} - -// openMMapFiles opens all files within dir for mmapping. -func (cdm *OldChunkDiskMapper) openMMapFiles() (returnErr error) { - cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} - cdm.closers = map[int]io.Closer{} - defer func() { - if returnErr != nil { - returnErr = tsdb_errors.NewMulti(returnErr, closeAllFromMap(cdm.closers)).Err() - - cdm.mmappedChunkFiles = nil - cdm.closers = nil - } - }() - - files, err := listChunkFiles(cdm.dir.Name()) - if err != nil { - return err - } - - files, err = repairLastChunkFile(files) - if err != nil { - return err - } - - chkFileIndices := make([]int, 0, len(files)) - for seq, fn := range files { - f, err := fileutil.OpenMmapFile(fn) - if err != nil { - return errors.Wrapf(err, "mmap files, file: %s", fn) - } - cdm.closers[seq] = f - cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())} - chkFileIndices = append(chkFileIndices, seq) - } - - // Check for gaps in the files. - sort.Ints(chkFileIndices) - if len(chkFileIndices) == 0 { - return nil - } - lastSeq := chkFileIndices[0] - for _, seq := range chkFileIndices[1:] { - if seq != lastSeq+1 { - return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq) - } - lastSeq = seq - } - - for i, b := range cdm.mmappedChunkFiles { - if b.byteSlice.Len() < HeadChunkFileHeaderSize { - return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i]) - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks { - return errors.Errorf("%s: invalid magic number %x", files[i], m) - } - - // Verify chunk format version. - if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { - return errors.Errorf("%s: invalid chunk format version %d", files[i], v) - } - } - - return nil -} - -// WriteChunk writes the chunk to the disk. -// The returned chunk ref is the reference from where the chunk encoding starts for the chunk. -func (cdm *OldChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { - chkRef, err := func() (ChunkDiskMapperRef, error) { - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - - if cdm.closed { - return 0, ErrChunkDiskMapperClosed - } - - if cdm.shouldCutNewFile(len(chk.Bytes())) { - if err := cdm.cut(); err != nil { - return 0, err - } - } - - // if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size; - // so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer). - if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) { - if err := cdm.flushBuffer(); err != nil { - return 0, err - } - } - - cdm.crc32.Reset() - bytesWritten := 0 - - chkRef = newChunkDiskMapperRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize())) - - binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(seriesRef)) - bytesWritten += SeriesRefSize - binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint)) - bytesWritten += MintMaxtSize - binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt)) - bytesWritten += MintMaxtSize - cdm.byteBuf[bytesWritten] = byte(chk.Encoding()) - bytesWritten += ChunkEncodingSize - n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes()))) - bytesWritten += n - - if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil { - return 0, err - } - if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil { - return 0, err - } - if err := cdm.writeCRC32(); err != nil { - return 0, err - } - - if maxt > cdm.curFileMaxt { - cdm.curFileMaxt = maxt - } - - cdm.chunkBuffer.put(chkRef, chk) - - if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize { - // The chunk was bigger than the buffer itself. - // Flushing to not keep partial chunks in buffer. - if err := cdm.flushBuffer(); err != nil { - return 0, err - } - } - - return chkRef, nil - }() - - if err != nil && callback != nil { - callback(err) - } - - return chkRef -} - -// shouldCutNewFile returns whether a new file should be cut, based on time and size retention. -// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. -// Time retention: so that we can delete old chunks with some time guarantee in low load environments. -func (cdm *OldChunkDiskMapper) shouldCutNewFile(chunkSize int) bool { - return cdm.curFileSize() == 0 || // First head chunk file. - cdm.curFileSize()+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size. -} - -// CutNewFile creates a new m-mapped file. -func (cdm *OldChunkDiskMapper) CutNewFile() (returnErr error) { - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - - return cdm.cut() -} - -// cut creates a new m-mapped file. The write lock should be held before calling this. -func (cdm *OldChunkDiskMapper) cut() (returnErr error) { - // Sync current tail to disk and close. - if err := cdm.finalizeCurFile(); err != nil { - return err - } - - n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize) - if err != nil { - return err - } - defer func() { - // The file should not be closed if there is no error, - // its kept open in the ChunkDiskMapper. - if returnErr != nil { - returnErr = tsdb_errors.NewMulti(returnErr, newFile.Close()).Err() - } - }() - - cdm.curFileNumBytes.Store(int64(n)) - - if cdm.curFile != nil { - cdm.readPathMtx.Lock() - cdm.mmappedChunkFiles[cdm.curFileSequence].maxt = cdm.curFileMaxt - cdm.readPathMtx.Unlock() - } - - mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize) - if err != nil { - return err - } - - cdm.readPathMtx.Lock() - cdm.curFileSequence = seq - cdm.curFile = newFile - if cdm.chkWriter != nil { - cdm.chkWriter.Reset(newFile) - } else { - cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize) - } - - cdm.closers[cdm.curFileSequence] = mmapFile - cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())} - cdm.readPathMtx.Unlock() - - cdm.curFileMaxt = 0 - - return nil -} - -// finalizeCurFile writes all pending data to the current tail file, -// truncates its size, and closes it. -func (cdm *OldChunkDiskMapper) finalizeCurFile() error { - if cdm.curFile == nil { - return nil - } - - if err := cdm.flushBuffer(); err != nil { - return err - } - - if err := cdm.curFile.Sync(); err != nil { - return err - } - - return cdm.curFile.Close() -} - -func (cdm *OldChunkDiskMapper) write(b []byte) error { - n, err := cdm.chkWriter.Write(b) - cdm.curFileNumBytes.Add(int64(n)) - return err -} - -func (cdm *OldChunkDiskMapper) writeAndAppendToCRC32(b []byte) error { - if err := cdm.write(b); err != nil { - return err - } - _, err := cdm.crc32.Write(b) - return err -} - -func (cdm *OldChunkDiskMapper) writeCRC32() error { - return cdm.write(cdm.crc32.Sum(cdm.byteBuf[:0])) -} - -// flushBuffer flushes the current in-memory chunks. -// Assumes that writePathMtx is _write_ locked before calling this method. -func (cdm *OldChunkDiskMapper) flushBuffer() error { - if err := cdm.chkWriter.Flush(); err != nil { - return err - } - cdm.chunkBuffer.clear() - return nil -} - -// Chunk returns a chunk from a given reference. -func (cdm *OldChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) { - cdm.readPathMtx.RLock() - // We hold this read lock for the entire duration because if Close() - // is called, the data in the byte slice will get corrupted as the mmapped - // file will be closed. - defer cdm.readPathMtx.RUnlock() - - if cdm.closed { - return nil, ErrChunkDiskMapperClosed - } - - sgmIndex, chkStart := ref.Unpack() - // We skip the series ref and the mint/maxt beforehand. - chkStart += SeriesRefSize + (2 * MintMaxtSize) - chkCRC32 := newCRC32() - - // If it is the current open file, then the chunks can be in the buffer too. - if sgmIndex == cdm.curFileSequence { - chunk := cdm.chunkBuffer.get(ref) - if chunk != nil { - return chunk, nil - } - } - - mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex] - if !ok { - if sgmIndex > cdm.curFileSequence { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: -1, - Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex), - } - } - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.New("head chunk file index %d does not exist on disk"), - } - } - - if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()), - } - } - - // Encoding. - chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0] - - // Data length. - // With the minimum chunk length this should never cause us reading - // over the end of the slice. - chkDataLenStart := chkStart + ChunkEncodingSize - c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize) - chkDataLen, n := binary.Uvarint(c) - if n <= 0 { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("reading chunk length failed with %d", n), - } - } - - // Verify the chunk data end. - chkDataEnd := chkDataLenStart + n + int(chkDataLen) - if chkDataEnd > mmapFile.byteSlice.Len() { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()), - } - } - - // Check the CRC. - sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize) - if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: err, - } - } - if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), - } - } - - // The chunk data itself. - chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd) - - // Make a copy of the chunk data to prevent a panic occurring because the returned - // chunk data slice references an mmap-ed file which could be closed after the - // function returns but while the chunk is still in use. - chkDataCopy := make([]byte, len(chkData)) - copy(chkDataCopy, chkData) - - chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkDataCopy) - if err != nil { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: err, - } - } - return chk, nil -} - -// IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it -// and runs the provided function with information about each chunk. It returns on the first error encountered. -// NOTE: This method needs to be called at least once after creating ChunkDiskMapper -// to set the maxt of all the file. -func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) { - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - - defer func() { - cdm.fileMaxtSet = true - }() - - chkCRC32 := newCRC32() - - // Iterate files in ascending order. - segIDs := make([]int, 0, len(cdm.mmappedChunkFiles)) - for seg := range cdm.mmappedChunkFiles { - segIDs = append(segIDs, seg) - } - sort.Ints(segIDs) - for _, segID := range segIDs { - mmapFile := cdm.mmappedChunkFiles[segID] - fileEnd := mmapFile.byteSlice.Len() - if segID == cdm.curFileSequence { - fileEnd = int(cdm.curFileSize()) - } - idx := HeadChunkFileHeaderSize - for idx < fileEnd { - if fileEnd-idx < MaxHeadChunkMetaSize { - // Check for all 0s which marks the end of the file. - allZeros := true - for _, b := range mmapFile.byteSlice.Range(idx, fileEnd) { - if b != byte(0) { - allZeros = false - break - } - } - if allZeros { - // End of segment chunk file content. - break - } - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+ - " - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID), - } - } - chkCRC32.Reset() - chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx)) - - startIdx := idx - seriesRef := HeadSeriesRef(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize))) - idx += SeriesRefSize - mint := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize))) - idx += MintMaxtSize - maxt := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize))) - idx += MintMaxtSize - - // We preallocate file to help with m-mapping (especially windows systems). - // As series ref always starts from 1, we assume it being 0 to be the end of the actual file data. - // We are not considering possible file corruption that can cause it to be 0. - // Additionally we are checking mint and maxt just to be sure. - if seriesRef == 0 && mint == 0 && maxt == 0 { - break - } - - // Encoding. - chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0]) - idx += ChunkEncodingSize - dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) - idx += n - - numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2)) - idx += int(dataLen) // Skip the data. - - // In the beginning we only checked for the chunk meta size. - // Now that we have added the chunk data length, we check for sufficient bytes again. - if idx+CRCSize > fileEnd { - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID), - } - } - - // Check CRC. - sum := mmapFile.byteSlice.Range(idx, idx+CRCSize) - if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil { - return err - } - if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), - } - } - idx += CRCSize - - if maxt > mmapFile.maxt { - mmapFile.maxt = maxt - } - - if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil { - if cerr, ok := err.(*CorruptionErr); ok { - cerr.Dir = cdm.dir.Name() - cerr.FileIndex = segID - return cerr - } - return err - } - } - - if idx > fileEnd { - // It should be equal to the slice length. - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID), - } - } - } - - return nil -} - -// Truncate deletes the head chunk files whose file number is less than given fileNo. -func (cdm *OldChunkDiskMapper) Truncate(fileNo int) error { - if !cdm.fileMaxtSet { - return errors.New("maxt of the files are not set") - } - cdm.readPathMtx.RLock() - - // Sort the file indices, else if files deletion fails in between, - // it can lead to unsequential files as the map is not sorted. - chkFileIndices := make([]int, 0, len(cdm.mmappedChunkFiles)) - for seq := range cdm.mmappedChunkFiles { - chkFileIndices = append(chkFileIndices, seq) - } - sort.Ints(chkFileIndices) - - var removedFiles []int - for _, seq := range chkFileIndices { - if seq == cdm.curFileSequence || seq >= fileNo { - break - } - removedFiles = append(removedFiles, seq) - } - cdm.readPathMtx.RUnlock() - - errs := tsdb_errors.NewMulti() - // Cut a new file only if the current file has some chunks. - if cdm.curFileSize() > HeadChunkFileHeaderSize { - errs.Add(cdm.CutNewFile()) - } - errs.Add(cdm.deleteFiles(removedFiles)) - return errs.Err() -} - -func (cdm *OldChunkDiskMapper) deleteFiles(removedFiles []int) error { - cdm.readPathMtx.Lock() - for _, seq := range removedFiles { - if err := cdm.closers[seq].Close(); err != nil { - cdm.readPathMtx.Unlock() - return err - } - delete(cdm.mmappedChunkFiles, seq) - delete(cdm.closers, seq) - } - cdm.readPathMtx.Unlock() - - // We actually delete the files separately to not block the readPathMtx for long. - for _, seq := range removedFiles { - if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil { - return err - } - } - - return nil -} - -// DeleteCorrupted deletes all the head chunk files after the one which had the corruption -// (including the corrupt file). -func (cdm *OldChunkDiskMapper) DeleteCorrupted(originalErr error) error { - err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped. - cerr, ok := err.(*CorruptionErr) - if !ok { - return errors.Wrap(originalErr, "cannot handle error") - } - - // Delete all the head chunk files following the corrupt head chunk file. - segs := []int{} - cdm.readPathMtx.RLock() - for seg := range cdm.mmappedChunkFiles { - if seg >= cerr.FileIndex { - segs = append(segs, seg) - } - } - cdm.readPathMtx.RUnlock() - - return cdm.deleteFiles(segs) -} - -// Size returns the size of the chunk files. -func (cdm *OldChunkDiskMapper) Size() (int64, error) { - return fileutil.DirSize(cdm.dir.Name()) -} - -func (cdm *OldChunkDiskMapper) curFileSize() int64 { - return cdm.curFileNumBytes.Load() -} - -// Close closes all the open files in ChunkDiskMapper. -// It is not longer safe to access chunks from this struct after calling Close. -func (cdm *OldChunkDiskMapper) Close() error { - // 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file. - // The lock order should not be reversed here else it can cause deadlocks. - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - cdm.readPathMtx.Lock() - defer cdm.readPathMtx.Unlock() - - if cdm.closed { - return nil - } - cdm.closed = true - - errs := tsdb_errors.NewMulti( - closeAllFromMap(cdm.closers), - cdm.finalizeCurFile(), - cdm.dir.Close(), - ) - cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} - cdm.closers = map[int]io.Closer{} - - return errs.Err() -} - -func (cdm *OldChunkDiskMapper) IsQueueEmpty() bool { - return true // there is no queue -} diff --git a/tsdb/chunks/old_head_chunks_test.go b/tsdb/chunks/old_head_chunks_test.go deleted file mode 100644 index 6a397ac0e5..0000000000 --- a/tsdb/chunks/old_head_chunks_test.go +++ /dev/null @@ -1,484 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package chunks - -import ( - "encoding/binary" - "errors" - "math/rand" - "os" - "strconv" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/prometheus/prometheus/tsdb/chunkenc" -) - -func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - expectedBytes := []byte{} - nextChunkOffset := uint64(HeadChunkFileHeaderSize) - chkCRC32 := newCRC32() - - type expectedDataType struct { - seriesRef HeadSeriesRef - chunkRef ChunkDiskMapperRef - mint, maxt int64 - numSamples uint16 - chunk chunkenc.Chunk - isOOO bool - } - expectedData := []expectedDataType{} - - var buf [MaxHeadChunkMetaSize]byte - totalChunks := 0 - var firstFileName string - for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 { - addChunks := func(numChunks int) { - for i := 0; i < numChunks; i++ { - seriesRef, chkRef, mint, maxt, chunk, isOOO := createChunkForOld(t, totalChunks, hrw) - totalChunks++ - expectedData = append(expectedData, expectedDataType{ - seriesRef: seriesRef, - mint: mint, - maxt: maxt, - chunkRef: chkRef, - chunk: chunk, - numSamples: uint16(chunk.NumSamples()), - isOOO: isOOO, - }) - - if hrw.curFileSequence != 1 { - // We are checking for bytes written only for the first file. - continue - } - - // Calculating expected bytes written on disk for first file. - firstFileName = hrw.curFile.Name() - require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef) - - bytesWritten := 0 - chkCRC32.Reset() - - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef)) - bytesWritten += SeriesRefSize - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint)) - bytesWritten += MintMaxtSize - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt)) - bytesWritten += MintMaxtSize - buf[bytesWritten] = byte(chunk.Encoding()) - bytesWritten += ChunkEncodingSize - n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes()))) - bytesWritten += n - - expectedBytes = append(expectedBytes, buf[:bytesWritten]...) - _, err := chkCRC32.Write(buf[:bytesWritten]) - require.NoError(t, err) - expectedBytes = append(expectedBytes, chunk.Bytes()...) - _, err = chkCRC32.Write(chunk.Bytes()) - require.NoError(t, err) - - expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...) - - // += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC. - nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize - } - } - addChunks(100) - hrw.CutNewFile() - addChunks(10) // For chunks in in-memory buffer. - } - - // Checking on-disk bytes for the first file. - require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles)) - require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) - - actualBytes, err := os.ReadFile(firstFileName) - require.NoError(t, err) - - // Check header of the segment file. - require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize]))) - require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize])) - - // Remaining chunk data. - fileEnd := HeadChunkFileHeaderSize + len(expectedBytes) - require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd]) - - // Testing reading of chunks. - for _, exp := range expectedData { - actChunk, err := hrw.Chunk(exp.chunkRef) - require.NoError(t, err) - require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes()) - } - - // Testing IterateAllChunks method. - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - idx := 0 - require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { - t.Helper() - - expData := expectedData[idx] - require.Equal(t, expData.seriesRef, seriesRef) - require.Equal(t, expData.chunkRef, chunkRef) - require.Equal(t, expData.maxt, maxt) - require.Equal(t, expData.maxt, maxt) - require.Equal(t, expData.numSamples, numSamples) - require.Equal(t, expData.isOOO, chunkenc.IsOutOfOrderChunk(encoding)) - - actChunk, err := hrw.Chunk(expData.chunkRef) - require.NoError(t, err) - require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes()) - - idx++ - return nil - })) - require.Equal(t, len(expectedData), idx) -} - -func TestOldChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, nil, hrw) - - // Checking on-disk bytes for the first file. - require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) - require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) - - // Testing IterateAllChunks method. - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { - t.Helper() - - require.Equal(t, ucSeriesRef, seriesRef) - require.Equal(t, ucChkRef, chunkRef) - require.Equal(t, ucMint, mint) - require.Equal(t, ucMaxt, maxt) - require.Equal(t, uchunk.Encoding(), encoding) // Asserts that the encoding is EncUnsupportedXOR - - actChunk, err := hrw.Chunk(chunkRef) - // The chunk encoding is unknown so Chunk() should fail but us the caller - // are ok with that. Above we asserted that the encoding we expected was - // EncUnsupportedXOR - require.NotNil(t, err) - require.Contains(t, err.Error(), "invalid chunk encoding \"\"") - require.Nil(t, actChunk) - - return nil - })) -} - -// TestOldChunkDiskMapper_Truncate tests -// * If truncation is happening properly based on the time passed. -// * The active file is not deleted even if the passed time makes it eligible to be deleted. -// * Empty current file does not lead to creation of another file after truncation. -// * Non-empty current file leads to creation of another file after truncation. -func TestOldChunkDiskMapper_Truncate(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - timeRange := 0 - fileTimeStep := 100 - - addChunk := func() { - mint := timeRange + 1 // Just after the new file cut. - maxt := timeRange + fileTimeStep - 1 // Just before the next file. - - // Write a chunks to set maxt for the segment. - _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - require.NoError(t, err) - }) - - timeRange += fileTimeStep - } - - verifyFiles := func(remainingFiles []int) { - t.Helper() - - files, err := os.ReadDir(hrw.dir.Name()) - require.NoError(t, err) - require.Equal(t, len(remainingFiles), len(files), "files on disk") - require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") - require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") - - for _, i := range remainingFiles { - _, ok := hrw.mmappedChunkFiles[i] - require.Equal(t, true, ok) - } - } - - // Create segments 1 to 7. - for i := 1; i <= 7; i++ { - require.NoError(t, hrw.CutNewFile()) - addChunk() - } - verifyFiles([]int{1, 2, 3, 4, 5, 6, 7}) - - // Truncating files. - require.NoError(t, hrw.Truncate(3)) - verifyFiles([]int{3, 4, 5, 6, 7, 8}) - - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Restarted. - var err error - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { - return nil - })) - require.True(t, hrw.fileMaxtSet) - - verifyFiles([]int{3, 4, 5, 6, 7, 8}) - // New file is created after restart even if last file was empty. - addChunk() - verifyFiles([]int{3, 4, 5, 6, 7, 8, 9}) - - // Truncating files after restart. - require.NoError(t, hrw.Truncate(6)) - verifyFiles([]int{6, 7, 8, 9, 10}) - - // As the last file was empty, this creates no new files. - require.NoError(t, hrw.Truncate(6)) - verifyFiles([]int{6, 7, 8, 9, 10}) - - require.NoError(t, hrw.Truncate(8)) - verifyFiles([]int{8, 9, 10}) - - addChunk() - - // Truncating till current time should not delete the current active file. - require.NoError(t, hrw.Truncate(10)) - verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created. -} - -// TestOldChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke -// holes into the file sequence, even if there are empty files in between non-empty files. -// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation -// simply deleted all empty files instead of stopping once it encountered a non-empty file. -func TestOldChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - timeRange := 0 - addChunk := func() { - step := 100 - mint, maxt := timeRange+1, timeRange+step-1 - _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - require.NoError(t, err) - }) - timeRange += step - } - emptyFile := func() { - require.NoError(t, hrw.CutNewFile()) - } - nonEmptyFile := func() { - emptyFile() - addChunk() - } - - addChunk() // 1. Created with the first chunk. - nonEmptyFile() // 2. - nonEmptyFile() // 3. - emptyFile() // 4. - nonEmptyFile() // 5. - emptyFile() // 6. - - verifyFiles := func(remainingFiles []int) { - t.Helper() - - files, err := os.ReadDir(hrw.dir.Name()) - require.NoError(t, err) - require.Equal(t, len(remainingFiles), len(files), "files on disk") - require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") - require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") - - for _, i := range remainingFiles { - _, ok := hrw.mmappedChunkFiles[i] - require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i) - } - } - - verifyFiles([]int{1, 2, 3, 4, 5, 6}) - - // Truncating files till 2. It should not delete anything after 3 (inclusive) - // though files 4 and 6 are empty. - require.NoError(t, hrw.Truncate(3)) - // As 6 was empty, it should not create another file. - verifyFiles([]int{3, 4, 5, 6}) - - addChunk() - // Truncate creates another file as 6 is not empty now. - require.NoError(t, hrw.Truncate(3)) - verifyFiles([]int{3, 4, 5, 6, 7}) - - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Restarting checks for unsequential files. - var err error - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - verifyFiles([]int{3, 4, 5, 6, 7}) -} - -// TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks tests for -// https://github.com/prometheus/prometheus/issues/7753 -func TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - // Write a chunks to iterate on it later. - _ = hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(err error) { - require.NoError(t, err) - }) - - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Restarting to recreate https://github.com/prometheus/prometheus/issues/7753. - hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - // Forcefully failing IterateAllChunks. - require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { - return errors.New("random error") - })) - - // Truncation call should not return error after IterateAllChunks fails. - require.NoError(t, hrw.Truncate(2000)) -} - -func TestOldChunkDiskMapper_ReadRepairOnEmptyLastFile(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - timeRange := 0 - addChunk := func() { - step := 100 - mint, maxt := timeRange+1, timeRange+step-1 - _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - require.NoError(t, err) - }) - timeRange += step - } - nonEmptyFile := func() { - require.NoError(t, hrw.CutNewFile()) - addChunk() - } - - addChunk() // 1. Created with the first chunk. - nonEmptyFile() // 2. - nonEmptyFile() // 3. - - require.Equal(t, 3, len(hrw.mmappedChunkFiles)) - lastFile := 0 - for idx := range hrw.mmappedChunkFiles { - if idx > lastFile { - lastFile = idx - } - } - require.Equal(t, 3, lastFile) - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Write an empty last file mimicking an abrupt shutdown on file creation. - emptyFileName := segmentFile(dir, lastFile+1) - f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0o666) - require.NoError(t, err) - require.NoError(t, f.Sync()) - stat, err := f.Stat() - require.NoError(t, err) - require.Equal(t, int64(0), stat.Size()) - require.NoError(t, f.Close()) - - // Open chunk disk mapper again, corrupt file should be removed. - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { - return nil - })) - require.True(t, hrw.fileMaxtSet) - - // Removed from memory. - require.Equal(t, 3, len(hrw.mmappedChunkFiles)) - for idx := range hrw.mmappedChunkFiles { - require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file") - } - - // Removed even from disk. - files, err := os.ReadDir(dir) - require.NoError(t, err) - require.Equal(t, 3, len(files)) - for _, fi := range files { - seq, err := strconv.ParseUint(fi.Name(), 10, 64) - require.NoError(t, err) - require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file") - } -} - -func testOldChunkDiskMapper(t *testing.T) *OldChunkDiskMapper { - tmpdir := t.TempDir() - - hrw, err := NewOldChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { - return nil - })) - require.True(t, hrw.fileMaxtSet) - return hrw -} - -func createChunkForOld(t *testing.T, idx int, hrw *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk, isOOO bool) { - seriesRef = HeadSeriesRef(rand.Int63()) - mint = int64((idx)*1000 + 1) - maxt = int64((idx + 1) * 1000) - chunk = randomChunk(t) - if rand.Intn(2) == 0 { - isOOO = true - chunk = &chunkenc.OOOXORChunk{XORChunk: chunk.(*chunkenc.XORChunk)} - } - chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(err error) { - require.NoError(t, err) - }) - return -} diff --git a/tsdb/db.go b/tsdb/db.go index c2731bb40f..73cb0e9b36 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -815,7 +815,6 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow) headOpts.OutOfOrderCapMin.Store(opts.OutOfOrderCapMin) headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax) - headOpts.NewChunkDiskMapper = opts.NewChunkDiskMapper if opts.IsolationDisabled { // We only override this flag if isolation is disabled at DB level. We use the default otherwise. headOpts.IsolationDisabled = opts.IsolationDisabled diff --git a/tsdb/db_test.go b/tsdb/db_test.go index f1bdc7d347..80875c9449 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -3861,11 +3861,21 @@ func TestOOOCompaction(t *testing.T) { verifySamples(db.Blocks()[1], 120, 239) verifySamples(db.Blocks()[2], 240, 310) - // Because of OOO compaction, we already have 2 m-map files. + // Because of OOO compaction, the current mmap file will end. // All the chunks are only in the first file. + // Add a dummy mmap chunk to create a new mmap file. mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot) files, err = os.ReadDir(mmapDir) require.NoError(t, err) + require.Len(t, files, 1) + waitC := make(chan struct{}) + db.head.chunkDiskMapper.WriteChunk(100, 0, 0, chunkenc.NewXORChunk(), func(err error) { + require.NoError(t, err) + close(waitC) + }) + <-waitC + files, err = os.ReadDir(mmapDir) + require.NoError(t, err) require.Len(t, files, 2) // Compact the in-order head and expect another block. @@ -4652,8 +4662,16 @@ func TestOOOCompactionFailure(t *testing.T) { require.Equal(t, len(db.Blocks()), 3) require.Equal(t, oldBlocks, db.Blocks()) - // Because of OOO compaction, we have a second m-map file now + // Because of OOO compaction, the current mmap file will end. // All the chunks are only in the first file. + // Add a dummy mmap chunk to create a new mmap file. + verifyMmapFiles("000001") + waitC := make(chan struct{}) + db.head.chunkDiskMapper.WriteChunk(100, 0, 0, chunkenc.NewXORChunk(), func(err error) { + require.NoError(t, err) + close(waitC) + }) + <-waitC verifyMmapFiles("000001", "000002") // All but last WBL file will be deleted. diff --git a/tsdb/head.go b/tsdb/head.go index fbcee9cc83..bba7b1d353 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -163,10 +163,6 @@ type HeadOptions struct { EnableMemorySnapshotOnShutdown bool IsolationDisabled bool - - // Temporary flag which we use to select whether to use the new (used in upstream - // Prometheus) or the old (legacy) chunk disk mapper. - NewChunkDiskMapper bool } const ( @@ -185,7 +181,6 @@ func DefaultHeadOptions() *HeadOptions { StripeSize: DefaultStripeSize, SeriesCallback: &noopSeriesLifecycleCallback{}, IsolationDisabled: defaultIsolationDisabled, - NewChunkDiskMapper: false, } ho.OutOfOrderCapMin.Store(DefaultOutOfOrderCapMin) ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax) @@ -273,21 +268,13 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wal.WAL, opts *Hea opts.ChunkPool = chunkenc.NewPool() } - if opts.NewChunkDiskMapper { - h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( - r, - mmappedChunksDir(opts.ChunkDirRoot), - opts.ChunkPool, - opts.ChunkWriteBufferSize, - opts.ChunkWriteQueueSize, - ) - } else { - h.chunkDiskMapper, err = chunks.NewOldChunkDiskMapper( - mmappedChunksDir(opts.ChunkDirRoot), - opts.ChunkPool, - opts.ChunkWriteBufferSize, - ) - } + h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( + r, + mmappedChunksDir(opts.ChunkDirRoot), + opts.ChunkPool, + opts.ChunkWriteBufferSize, + opts.ChunkWriteQueueSize, + ) if err != nil { return nil, err } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index a8dbe9b371..a4c54ddf83 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1691,14 +1691,7 @@ func TestHeadReadWriterRepair(t *testing.T) { // take effect without another chunk being written. files, err := os.ReadDir(mmappedChunksDir(dir)) require.NoError(t, err) - - // With the new chunk disk mapper we only expect 6 files, because the last call to "CutNewFile()" won't - // take effect until the next chunk is being written. - if opts.NewChunkDiskMapper { - require.Equal(t, 6, len(files)) - } else { - require.Equal(t, 7, len(files)) - } + require.Equal(t, 6, len(files)) // Corrupt the 4th file by writing a random byte to series ref. f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0o666) diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index c7c27f763c..183c32495b 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -432,20 +432,24 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) { matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} values, err := oh.LabelValues("foo", matchers...) require.NoError(t, err) + sort.Strings(values) require.Equal(t, tc.expValues1, values) matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")} values, err = oh.LabelValues("foo", matchers...) require.NoError(t, err) + sort.Strings(values) require.Equal(t, tc.expValues2, values) matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")} values, err = oh.LabelValues("foo", matchers...) require.NoError(t, err) + sort.Strings(values) require.Equal(t, tc.expValues3, values) values, err = oh.LabelValues("foo") require.NoError(t, err) + sort.Strings(values) require.Equal(t, tc.expValues4, values) }) }