diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 297abc5227..52912f486a 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -484,11 +484,13 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64 } // CutNewFile makes that a new file will be created the next time a chunk is written. -func (cdm *ChunkDiskMapper) CutNewFile() { +func (cdm *ChunkDiskMapper) CutNewFile() error { cdm.evtlPosMtx.Lock() defer cdm.evtlPosMtx.Unlock() cdm.evtlPos.cutFileOnNextChunk() + + return nil } func (cdm *ChunkDiskMapper) IsQueueEmpty() bool { @@ -893,7 +895,7 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error { // There is a known race condition here because between the check of curFileSize() and the call to CutNewFile() // a new file could already be cut, this is acceptable because it will simply result in an empty file which // won't do any harm. - cdm.CutNewFile() + errs.Add(cdm.CutNewFile()) } pendingDeletes, err := cdm.deleteFiles(removedFiles) errs.Add(err) diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 34c1ed9b95..5820c1242e 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -174,7 +174,7 @@ func TestChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) require.NoError(t, hrw.Close()) }() - ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw) + ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw, nil) // Checking on-disk bytes for the first file. require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) @@ -555,17 +555,24 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer return } -func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { +func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper, hrwOld *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { var err error seriesRef = HeadSeriesRef(rand.Int63()) mint = int64((idx)*1000 + 1) maxt = int64((idx + 1) * 1000) chunk = randomUnsupportedChunk(t) awaitCb := make(chan struct{}) - chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { - require.NoError(t, err) + if hrw != nil { + chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { + require.NoError(t, err) + close(awaitCb) + }) + } else { + chunkRef = hrwOld.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { + require.NoError(t, err) + }) close(awaitCb) - }) + } <-awaitCb return } diff --git a/tsdb/chunks/old_head_chunks.go b/tsdb/chunks/old_head_chunks.go new file mode 100644 index 0000000000..5301fd1ace --- /dev/null +++ b/tsdb/chunks/old_head_chunks.go @@ -0,0 +1,716 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunks + +import ( + "bufio" + "bytes" + "encoding/binary" + "hash" + "io" + "os" + "sort" + "sync" + + "github.com/pkg/errors" + "go.uber.org/atomic" + + "github.com/prometheus/prometheus/tsdb/chunkenc" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" +) + +// OldChunkDiskMapper is for writing the Head block chunks to the disk +// and access chunks via mmapped file. +type OldChunkDiskMapper struct { + curFileNumBytes atomic.Int64 // Bytes written in current open file. + + /// Writer. + dir *os.File + writeBufferSize int + + curFile *os.File // File being written to. + curFileSequence int // Index of current open file being appended to. + curFileMaxt int64 // Used for the size retention. + + byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk. + chkWriter *bufio.Writer // Writer for the current open file. + crc32 hash.Hash + writePathMtx sync.Mutex + + /// Reader. + // The int key in the map is the file number on the disk. + mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index. + closers map[int]io.Closer // Closers for resources behind the byte slices. + readPathMtx sync.RWMutex // Mutex used to protect the above 2 maps. + pool chunkenc.Pool // This is used when fetching a chunk from the disk to allocate a chunk. + + // Writer and Reader. + // We flush chunks to disk in batches. Hence, we store them in this buffer + // from which chunks are served till they are flushed and are ready for m-mapping. + chunkBuffer *chunkBuffer + + // Whether the maxt field is set for all mmapped chunk files tracked within the mmappedChunkFiles map. + // This is done after iterating through all the chunks in those files using the IterateAllChunks method. + fileMaxtSet bool + + closed bool +} + +// NewOldChunkDiskMapper returns a new ChunkDiskMapper against the given directory +// using the default head chunk file duration. +// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper +// to set the maxt of all the file. +func NewOldChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*OldChunkDiskMapper, error) { + // Validate write buffer size. + if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize { + return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize) + } + if writeBufferSize%1024 != 0 { + return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize) + } + + if err := os.MkdirAll(dir, 0o777); err != nil { + return nil, err + } + dirFile, err := fileutil.OpenDir(dir) + if err != nil { + return nil, err + } + + m := &OldChunkDiskMapper{ + dir: dirFile, + pool: pool, + writeBufferSize: writeBufferSize, + crc32: newCRC32(), + chunkBuffer: newChunkBuffer(), + } + + if m.pool == nil { + m.pool = chunkenc.NewPool() + } + + return m, m.openMMapFiles() +} + +// openMMapFiles opens all files within dir for mmapping. +func (cdm *OldChunkDiskMapper) openMMapFiles() (returnErr error) { + cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} + cdm.closers = map[int]io.Closer{} + defer func() { + if returnErr != nil { + returnErr = tsdb_errors.NewMulti(returnErr, closeAllFromMap(cdm.closers)).Err() + + cdm.mmappedChunkFiles = nil + cdm.closers = nil + } + }() + + files, err := listChunkFiles(cdm.dir.Name()) + if err != nil { + return err + } + + files, err = repairLastChunkFile(files) + if err != nil { + return err + } + + chkFileIndices := make([]int, 0, len(files)) + for seq, fn := range files { + f, err := fileutil.OpenMmapFile(fn) + if err != nil { + return errors.Wrapf(err, "mmap files, file: %s", fn) + } + cdm.closers[seq] = f + cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())} + chkFileIndices = append(chkFileIndices, seq) + } + + // Check for gaps in the files. + sort.Ints(chkFileIndices) + if len(chkFileIndices) == 0 { + return nil + } + lastSeq := chkFileIndices[0] + for _, seq := range chkFileIndices[1:] { + if seq != lastSeq+1 { + return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq) + } + lastSeq = seq + } + + for i, b := range cdm.mmappedChunkFiles { + if b.byteSlice.Len() < HeadChunkFileHeaderSize { + return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i]) + } + // Verify magic number. + if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks { + return errors.Errorf("%s: invalid magic number %x", files[i], m) + } + + // Verify chunk format version. + if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { + return errors.Errorf("%s: invalid chunk format version %d", files[i], v) + } + } + + return nil +} + +// WriteChunk writes the chunk to the disk. +// The returned chunk ref is the reference from where the chunk encoding starts for the chunk. +func (cdm *OldChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { + chkRef, err := func() (ChunkDiskMapperRef, error) { + cdm.writePathMtx.Lock() + defer cdm.writePathMtx.Unlock() + + if cdm.closed { + return 0, ErrChunkDiskMapperClosed + } + + if cdm.shouldCutNewFile(len(chk.Bytes())) { + if err := cdm.cut(); err != nil { + return 0, err + } + } + + // if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size; + // so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer). + if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) { + if err := cdm.flushBuffer(); err != nil { + return 0, err + } + } + + cdm.crc32.Reset() + bytesWritten := 0 + + chkRef = newChunkDiskMapperRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize())) + + binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(seriesRef)) + bytesWritten += SeriesRefSize + binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint)) + bytesWritten += MintMaxtSize + binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt)) + bytesWritten += MintMaxtSize + cdm.byteBuf[bytesWritten] = byte(chk.Encoding()) + bytesWritten += ChunkEncodingSize + n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes()))) + bytesWritten += n + + if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil { + return 0, err + } + if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil { + return 0, err + } + if err := cdm.writeCRC32(); err != nil { + return 0, err + } + + if maxt > cdm.curFileMaxt { + cdm.curFileMaxt = maxt + } + + cdm.chunkBuffer.put(chkRef, chk) + + if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize { + // The chunk was bigger than the buffer itself. + // Flushing to not keep partial chunks in buffer. + if err := cdm.flushBuffer(); err != nil { + return 0, err + } + } + + return chkRef, nil + }() + + if err != nil && callback != nil { + callback(err) + } + + return chkRef +} + +// shouldCutNewFile returns whether a new file should be cut, based on time and size retention. +// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. +// Time retention: so that we can delete old chunks with some time guarantee in low load environments. +func (cdm *OldChunkDiskMapper) shouldCutNewFile(chunkSize int) bool { + return cdm.curFileSize() == 0 || // First head chunk file. + cdm.curFileSize()+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size. +} + +// CutNewFile creates a new m-mapped file. +func (cdm *OldChunkDiskMapper) CutNewFile() (returnErr error) { + cdm.writePathMtx.Lock() + defer cdm.writePathMtx.Unlock() + + return cdm.cut() +} + +// cut creates a new m-mapped file. The write lock should be held before calling this. +func (cdm *OldChunkDiskMapper) cut() (returnErr error) { + // Sync current tail to disk and close. + if err := cdm.finalizeCurFile(); err != nil { + return err + } + + n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize) + if err != nil { + return err + } + defer func() { + // The file should not be closed if there is no error, + // its kept open in the ChunkDiskMapper. + if returnErr != nil { + returnErr = tsdb_errors.NewMulti(returnErr, newFile.Close()).Err() + } + }() + + cdm.curFileNumBytes.Store(int64(n)) + + if cdm.curFile != nil { + cdm.readPathMtx.Lock() + cdm.mmappedChunkFiles[cdm.curFileSequence].maxt = cdm.curFileMaxt + cdm.readPathMtx.Unlock() + } + + mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize) + if err != nil { + return err + } + + cdm.readPathMtx.Lock() + cdm.curFileSequence = seq + cdm.curFile = newFile + if cdm.chkWriter != nil { + cdm.chkWriter.Reset(newFile) + } else { + cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize) + } + + cdm.closers[cdm.curFileSequence] = mmapFile + cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())} + cdm.readPathMtx.Unlock() + + cdm.curFileMaxt = 0 + + return nil +} + +// finalizeCurFile writes all pending data to the current tail file, +// truncates its size, and closes it. +func (cdm *OldChunkDiskMapper) finalizeCurFile() error { + if cdm.curFile == nil { + return nil + } + + if err := cdm.flushBuffer(); err != nil { + return err + } + + if err := cdm.curFile.Sync(); err != nil { + return err + } + + return cdm.curFile.Close() +} + +func (cdm *OldChunkDiskMapper) write(b []byte) error { + n, err := cdm.chkWriter.Write(b) + cdm.curFileNumBytes.Add(int64(n)) + return err +} + +func (cdm *OldChunkDiskMapper) writeAndAppendToCRC32(b []byte) error { + if err := cdm.write(b); err != nil { + return err + } + _, err := cdm.crc32.Write(b) + return err +} + +func (cdm *OldChunkDiskMapper) writeCRC32() error { + return cdm.write(cdm.crc32.Sum(cdm.byteBuf[:0])) +} + +// flushBuffer flushes the current in-memory chunks. +// Assumes that writePathMtx is _write_ locked before calling this method. +func (cdm *OldChunkDiskMapper) flushBuffer() error { + if err := cdm.chkWriter.Flush(); err != nil { + return err + } + cdm.chunkBuffer.clear() + return nil +} + +// Chunk returns a chunk from a given reference. +func (cdm *OldChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) { + cdm.readPathMtx.RLock() + // We hold this read lock for the entire duration because if Close() + // is called, the data in the byte slice will get corrupted as the mmapped + // file will be closed. + defer cdm.readPathMtx.RUnlock() + + if cdm.closed { + return nil, ErrChunkDiskMapperClosed + } + + sgmIndex, chkStart := ref.Unpack() + // We skip the series ref and the mint/maxt beforehand. + chkStart += SeriesRefSize + (2 * MintMaxtSize) + chkCRC32 := newCRC32() + + // If it is the current open file, then the chunks can be in the buffer too. + if sgmIndex == cdm.curFileSequence { + chunk := cdm.chunkBuffer.get(ref) + if chunk != nil { + return chunk, nil + } + } + + mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex] + if !ok { + if sgmIndex > cdm.curFileSequence { + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: -1, + Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex), + } + } + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: errors.New("head chunk file index %d does not exist on disk"), + } + } + + if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() { + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()), + } + } + + // Encoding. + chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0] + + // Data length. + // With the minimum chunk length this should never cause us reading + // over the end of the slice. + chkDataLenStart := chkStart + ChunkEncodingSize + c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize) + chkDataLen, n := binary.Uvarint(c) + if n <= 0 { + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: errors.Errorf("reading chunk length failed with %d", n), + } + } + + // Verify the chunk data end. + chkDataEnd := chkDataLenStart + n + int(chkDataLen) + if chkDataEnd > mmapFile.byteSlice.Len() { + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()), + } + } + + // Check the CRC. + sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize) + if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil { + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: err, + } + } + if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), + } + } + + // The chunk data itself. + chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd) + + // Make a copy of the chunk data to prevent a panic occurring because the returned + // chunk data slice references an mmap-ed file which could be closed after the + // function returns but while the chunk is still in use. + chkDataCopy := make([]byte, len(chkData)) + copy(chkDataCopy, chkData) + + chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkDataCopy) + if err != nil { + return nil, &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: sgmIndex, + Err: err, + } + } + return chk, nil +} + +// IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it +// and runs the provided function with information about each chunk. It returns on the first error encountered. +// NOTE: This method needs to be called at least once after creating ChunkDiskMapper +// to set the maxt of all the file. +func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) { + cdm.writePathMtx.Lock() + defer cdm.writePathMtx.Unlock() + + defer func() { + cdm.fileMaxtSet = true + }() + + chkCRC32 := newCRC32() + + // Iterate files in ascending order. + segIDs := make([]int, 0, len(cdm.mmappedChunkFiles)) + for seg := range cdm.mmappedChunkFiles { + segIDs = append(segIDs, seg) + } + sort.Ints(segIDs) + for _, segID := range segIDs { + mmapFile := cdm.mmappedChunkFiles[segID] + fileEnd := mmapFile.byteSlice.Len() + if segID == cdm.curFileSequence { + fileEnd = int(cdm.curFileSize()) + } + idx := HeadChunkFileHeaderSize + for idx < fileEnd { + if fileEnd-idx < MaxHeadChunkMetaSize { + // Check for all 0s which marks the end of the file. + allZeros := true + for _, b := range mmapFile.byteSlice.Range(idx, fileEnd) { + if b != byte(0) { + allZeros = false + break + } + } + if allZeros { + // End of segment chunk file content. + break + } + return &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: segID, + Err: errors.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+ + " - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID), + } + } + chkCRC32.Reset() + chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx)) + + startIdx := idx + seriesRef := HeadSeriesRef(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize))) + idx += SeriesRefSize + mint := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize))) + idx += MintMaxtSize + maxt := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize))) + idx += MintMaxtSize + + // We preallocate file to help with m-mapping (especially windows systems). + // As series ref always starts from 1, we assume it being 0 to be the end of the actual file data. + // We are not considering possible file corruption that can cause it to be 0. + // Additionally we are checking mint and maxt just to be sure. + if seriesRef == 0 && mint == 0 && maxt == 0 { + break + } + + // Encoding. + chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0]) + idx += ChunkEncodingSize + dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) + idx += n + + numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2)) + idx += int(dataLen) // Skip the data. + + // In the beginning we only checked for the chunk meta size. + // Now that we have added the chunk data length, we check for sufficient bytes again. + if idx+CRCSize > fileEnd { + return &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: segID, + Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID), + } + } + + // Check CRC. + sum := mmapFile.byteSlice.Range(idx, idx+CRCSize) + if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil { + return err + } + if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { + return &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: segID, + Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), + } + } + idx += CRCSize + + if maxt > mmapFile.maxt { + mmapFile.maxt = maxt + } + + if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil { + if cerr, ok := err.(*CorruptionErr); ok { + cerr.Dir = cdm.dir.Name() + cerr.FileIndex = segID + return cerr + } + return err + } + } + + if idx > fileEnd { + // It should be equal to the slice length. + return &CorruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: segID, + Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID), + } + } + } + + return nil +} + +// Truncate deletes the head chunk files which are strictly below the mint. +// mint should be in milliseconds. +func (cdm *OldChunkDiskMapper) Truncate(mint int64) error { + if !cdm.fileMaxtSet { + return errors.New("maxt of the files are not set") + } + cdm.readPathMtx.RLock() + + // Sort the file indices, else if files deletion fails in between, + // it can lead to unsequential files as the map is not sorted. + chkFileIndices := make([]int, 0, len(cdm.mmappedChunkFiles)) + for seq := range cdm.mmappedChunkFiles { + chkFileIndices = append(chkFileIndices, seq) + } + sort.Ints(chkFileIndices) + + var removedFiles []int + for _, seq := range chkFileIndices { + if seq == cdm.curFileSequence || cdm.mmappedChunkFiles[seq].maxt >= mint { + break + } + if cdm.mmappedChunkFiles[seq].maxt < mint { + removedFiles = append(removedFiles, seq) + } + } + cdm.readPathMtx.RUnlock() + + errs := tsdb_errors.NewMulti() + // Cut a new file only if the current file has some chunks. + if cdm.curFileSize() > HeadChunkFileHeaderSize { + errs.Add(cdm.CutNewFile()) + } + errs.Add(cdm.deleteFiles(removedFiles)) + return errs.Err() +} + +func (cdm *OldChunkDiskMapper) deleteFiles(removedFiles []int) error { + cdm.readPathMtx.Lock() + for _, seq := range removedFiles { + if err := cdm.closers[seq].Close(); err != nil { + cdm.readPathMtx.Unlock() + return err + } + delete(cdm.mmappedChunkFiles, seq) + delete(cdm.closers, seq) + } + cdm.readPathMtx.Unlock() + + // We actually delete the files separately to not block the readPathMtx for long. + for _, seq := range removedFiles { + if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil { + return err + } + } + + return nil +} + +// DeleteCorrupted deletes all the head chunk files after the one which had the corruption +// (including the corrupt file). +func (cdm *OldChunkDiskMapper) DeleteCorrupted(originalErr error) error { + err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped. + cerr, ok := err.(*CorruptionErr) + if !ok { + return errors.Wrap(originalErr, "cannot handle error") + } + + // Delete all the head chunk files following the corrupt head chunk file. + segs := []int{} + cdm.readPathMtx.RLock() + for seg := range cdm.mmappedChunkFiles { + if seg >= cerr.FileIndex { + segs = append(segs, seg) + } + } + cdm.readPathMtx.RUnlock() + + return cdm.deleteFiles(segs) +} + +// Size returns the size of the chunk files. +func (cdm *OldChunkDiskMapper) Size() (int64, error) { + return fileutil.DirSize(cdm.dir.Name()) +} + +func (cdm *OldChunkDiskMapper) curFileSize() int64 { + return cdm.curFileNumBytes.Load() +} + +// Close closes all the open files in ChunkDiskMapper. +// It is not longer safe to access chunks from this struct after calling Close. +func (cdm *OldChunkDiskMapper) Close() error { + // 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file. + // The lock order should not be reversed here else it can cause deadlocks. + cdm.writePathMtx.Lock() + defer cdm.writePathMtx.Unlock() + cdm.readPathMtx.Lock() + defer cdm.readPathMtx.Unlock() + + if cdm.closed { + return nil + } + cdm.closed = true + + errs := tsdb_errors.NewMulti( + closeAllFromMap(cdm.closers), + cdm.finalizeCurFile(), + cdm.dir.Close(), + ) + cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} + cdm.closers = map[int]io.Closer{} + + return errs.Err() +} + +func (cdm *OldChunkDiskMapper) IsQueueEmpty() bool { + return true // there is no queue +} diff --git a/tsdb/chunks/old_head_chunks_test.go b/tsdb/chunks/old_head_chunks_test.go new file mode 100644 index 0000000000..d81ef3df22 --- /dev/null +++ b/tsdb/chunks/old_head_chunks_test.go @@ -0,0 +1,487 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunks + +import ( + "encoding/binary" + "errors" + "io/ioutil" + "math/rand" + "os" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { + hrw := testOldChunkDiskMapper(t) + defer func() { + require.NoError(t, hrw.Close()) + }() + + expectedBytes := []byte{} + nextChunkOffset := uint64(HeadChunkFileHeaderSize) + chkCRC32 := newCRC32() + + type expectedDataType struct { + seriesRef HeadSeriesRef + chunkRef ChunkDiskMapperRef + mint, maxt int64 + numSamples uint16 + chunk chunkenc.Chunk + } + expectedData := []expectedDataType{} + + var buf [MaxHeadChunkMetaSize]byte + totalChunks := 0 + var firstFileName string + for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 { + addChunks := func(numChunks int) { + for i := 0; i < numChunks; i++ { + seriesRef, chkRef, mint, maxt, chunk := createChunkForOld(t, totalChunks, hrw) + totalChunks++ + expectedData = append(expectedData, expectedDataType{ + seriesRef: seriesRef, + mint: mint, + maxt: maxt, + chunkRef: chkRef, + chunk: chunk, + numSamples: uint16(chunk.NumSamples()), + }) + + if hrw.curFileSequence != 1 { + // We are checking for bytes written only for the first file. + continue + } + + // Calculating expected bytes written on disk for first file. + firstFileName = hrw.curFile.Name() + require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef) + + bytesWritten := 0 + chkCRC32.Reset() + + binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef)) + bytesWritten += SeriesRefSize + binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint)) + bytesWritten += MintMaxtSize + binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt)) + bytesWritten += MintMaxtSize + buf[bytesWritten] = byte(chunk.Encoding()) + bytesWritten += ChunkEncodingSize + n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes()))) + bytesWritten += n + + expectedBytes = append(expectedBytes, buf[:bytesWritten]...) + _, err := chkCRC32.Write(buf[:bytesWritten]) + require.NoError(t, err) + expectedBytes = append(expectedBytes, chunk.Bytes()...) + _, err = chkCRC32.Write(chunk.Bytes()) + require.NoError(t, err) + + expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...) + + // += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC. + nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize + } + } + addChunks(100) + hrw.CutNewFile() + addChunks(10) // For chunks in in-memory buffer. + } + + // Checking on-disk bytes for the first file. + require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles)) + require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) + + actualBytes, err := ioutil.ReadFile(firstFileName) + require.NoError(t, err) + + // Check header of the segment file. + require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize]))) + require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize])) + + // Remaining chunk data. + fileEnd := HeadChunkFileHeaderSize + len(expectedBytes) + require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd]) + + // Testing reading of chunks. + for _, exp := range expectedData { + actChunk, err := hrw.Chunk(exp.chunkRef) + require.NoError(t, err) + require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes()) + } + + // Testing IterateAllChunks method. + dir := hrw.dir.Name() + require.NoError(t, hrw.Close()) + hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) + require.NoError(t, err) + + idx := 0 + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { + t.Helper() + + expData := expectedData[idx] + require.Equal(t, expData.seriesRef, seriesRef) + require.Equal(t, expData.chunkRef, chunkRef) + require.Equal(t, expData.maxt, maxt) + require.Equal(t, expData.maxt, maxt) + require.Equal(t, expData.numSamples, numSamples) + + actChunk, err := hrw.Chunk(expData.chunkRef) + require.NoError(t, err) + require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes()) + + idx++ + return nil + })) + require.Equal(t, len(expectedData), idx) +} + +func TestOldChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) { + hrw := testOldChunkDiskMapper(t) + defer func() { + require.NoError(t, hrw.Close()) + }() + + ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, nil, hrw) + + // Checking on-disk bytes for the first file. + require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) + require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) + + // Testing IterateAllChunks method. + dir := hrw.dir.Name() + require.NoError(t, hrw.Close()) + hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) + require.NoError(t, err) + + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { + t.Helper() + + require.Equal(t, ucSeriesRef, seriesRef) + require.Equal(t, ucChkRef, chunkRef) + require.Equal(t, ucMint, mint) + require.Equal(t, ucMaxt, maxt) + require.Equal(t, uchunk.Encoding(), encoding) // Asserts that the encoding is EncUnsupportedXOR + + actChunk, err := hrw.Chunk(chunkRef) + // The chunk encoding is unknown so Chunk() should fail but us the caller + // are ok with that. Above we asserted that the encoding we expected was + // EncUnsupportedXOR + require.NotNil(t, err) + require.Contains(t, err.Error(), "invalid chunk encoding \"\"") + require.Nil(t, actChunk) + + return nil + })) +} + +// TestOldChunkDiskMapper_Truncate tests +// * If truncation is happening properly based on the time passed. +// * The active file is not deleted even if the passed time makes it eligible to be deleted. +// * Empty current file does not lead to creation of another file after truncation. +// * Non-empty current file leads to creation of another file after truncation. +func TestOldChunkDiskMapper_Truncate(t *testing.T) { + hrw := testOldChunkDiskMapper(t) + defer func() { + require.NoError(t, hrw.Close()) + }() + + timeRange := 0 + fileTimeStep := 100 + var thirdFileMinT, sixthFileMinT int64 + + addChunk := func() int { + mint := timeRange + 1 // Just after the new file cut. + maxt := timeRange + fileTimeStep - 1 // Just before the next file. + + // Write a chunks to set maxt for the segment. + _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { + require.NoError(t, err) + }) + + timeRange += fileTimeStep + + return mint + } + + verifyFiles := func(remainingFiles []int) { + t.Helper() + + files, err := ioutil.ReadDir(hrw.dir.Name()) + require.NoError(t, err) + require.Equal(t, len(remainingFiles), len(files), "files on disk") + require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") + require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") + + for _, i := range remainingFiles { + _, ok := hrw.mmappedChunkFiles[i] + require.Equal(t, true, ok) + } + } + + // Create segments 1 to 7. + for i := 1; i <= 7; i++ { + require.NoError(t, hrw.CutNewFile()) + mint := int64(addChunk()) + if i == 3 { + thirdFileMinT = mint + } else if i == 6 { + sixthFileMinT = mint + } + } + verifyFiles([]int{1, 2, 3, 4, 5, 6, 7}) + + // Truncating files. + require.NoError(t, hrw.Truncate(thirdFileMinT)) + verifyFiles([]int{3, 4, 5, 6, 7, 8}) + + dir := hrw.dir.Name() + require.NoError(t, hrw.Close()) + + // Restarted. + var err error + hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) + require.NoError(t, err) + + require.False(t, hrw.fileMaxtSet) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) + require.True(t, hrw.fileMaxtSet) + + verifyFiles([]int{3, 4, 5, 6, 7, 8}) + // New file is created after restart even if last file was empty. + addChunk() + verifyFiles([]int{3, 4, 5, 6, 7, 8, 9}) + + // Truncating files after restart. + require.NoError(t, hrw.Truncate(sixthFileMinT)) + verifyFiles([]int{6, 7, 8, 9, 10}) + + // As the last file was empty, this creates no new files. + require.NoError(t, hrw.Truncate(sixthFileMinT+1)) + verifyFiles([]int{6, 7, 8, 9, 10}) + addChunk() + + // Truncating till current time should not delete the current active file. + require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep)))) + verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created. +} + +// TestOldChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke +// holes into the file sequence, even if there are empty files in between non-empty files. +// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation +// simply deleted all empty files instead of stopping once it encountered a non-empty file. +func TestOldChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { + hrw := testOldChunkDiskMapper(t) + defer func() { + require.NoError(t, hrw.Close()) + }() + + timeRange := 0 + addChunk := func() { + step := 100 + mint, maxt := timeRange+1, timeRange+step-1 + _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { + require.NoError(t, err) + }) + timeRange += step + } + emptyFile := func() { + require.NoError(t, hrw.CutNewFile()) + } + nonEmptyFile := func() { + emptyFile() + addChunk() + } + + addChunk() // 1. Created with the first chunk. + nonEmptyFile() // 2. + nonEmptyFile() // 3. + emptyFile() // 4. + nonEmptyFile() // 5. + emptyFile() // 6. + + verifyFiles := func(remainingFiles []int) { + t.Helper() + + files, err := ioutil.ReadDir(hrw.dir.Name()) + require.NoError(t, err) + require.Equal(t, len(remainingFiles), len(files), "files on disk") + require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") + require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") + + for _, i := range remainingFiles { + _, ok := hrw.mmappedChunkFiles[i] + require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i) + } + } + + verifyFiles([]int{1, 2, 3, 4, 5, 6}) + + // Truncating files till 2. It should not delete anything after 3 (inclusive) + // though files 4 and 6 are empty. + file2Maxt := hrw.mmappedChunkFiles[2].maxt + require.NoError(t, hrw.Truncate(file2Maxt+1)) + // As 6 was empty, it should not create another file. + verifyFiles([]int{3, 4, 5, 6}) + + addChunk() + // Truncate creates another file as 6 is not empty now. + require.NoError(t, hrw.Truncate(file2Maxt+1)) + verifyFiles([]int{3, 4, 5, 6, 7}) + + dir := hrw.dir.Name() + require.NoError(t, hrw.Close()) + + // Restarting checks for unsequential files. + var err error + hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) + require.NoError(t, err) + verifyFiles([]int{3, 4, 5, 6, 7}) +} + +// TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks tests for +// https://github.com/prometheus/prometheus/issues/7753 +func TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks(t *testing.T) { + hrw := testOldChunkDiskMapper(t) + defer func() { + require.NoError(t, hrw.Close()) + }() + + // Write a chunks to iterate on it later. + _ = hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(err error) { + require.NoError(t, err) + }) + + dir := hrw.dir.Name() + require.NoError(t, hrw.Close()) + + // Restarting to recreate https://github.com/prometheus/prometheus/issues/7753. + hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) + require.NoError(t, err) + + // Forcefully failing IterateAllChunks. + require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return errors.New("random error") + })) + + // Truncation call should not return error after IterateAllChunks fails. + require.NoError(t, hrw.Truncate(2000)) +} + +func TestOldChunkDiskMapper_ReadRepairOnEmptyLastFile(t *testing.T) { + hrw := testOldChunkDiskMapper(t) + defer func() { + require.NoError(t, hrw.Close()) + }() + + timeRange := 0 + addChunk := func() { + step := 100 + mint, maxt := timeRange+1, timeRange+step-1 + _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { + require.NoError(t, err) + }) + timeRange += step + } + nonEmptyFile := func() { + require.NoError(t, hrw.CutNewFile()) + addChunk() + } + + addChunk() // 1. Created with the first chunk. + nonEmptyFile() // 2. + nonEmptyFile() // 3. + + require.Equal(t, 3, len(hrw.mmappedChunkFiles)) + lastFile := 0 + for idx := range hrw.mmappedChunkFiles { + if idx > lastFile { + lastFile = idx + } + } + require.Equal(t, 3, lastFile) + dir := hrw.dir.Name() + require.NoError(t, hrw.Close()) + + // Write an empty last file mimicking an abrupt shutdown on file creation. + emptyFileName := segmentFile(dir, lastFile+1) + f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0o666) + require.NoError(t, err) + require.NoError(t, f.Sync()) + stat, err := f.Stat() + require.NoError(t, err) + require.Equal(t, int64(0), stat.Size()) + require.NoError(t, f.Close()) + + // Open chunk disk mapper again, corrupt file should be removed. + hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) + require.NoError(t, err) + require.False(t, hrw.fileMaxtSet) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) + require.True(t, hrw.fileMaxtSet) + + // Removed from memory. + require.Equal(t, 3, len(hrw.mmappedChunkFiles)) + for idx := range hrw.mmappedChunkFiles { + require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file") + } + + // Removed even from disk. + files, err := ioutil.ReadDir(dir) + require.NoError(t, err) + require.Equal(t, 3, len(files)) + for _, fi := range files { + seq, err := strconv.ParseUint(fi.Name(), 10, 64) + require.NoError(t, err) + require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file") + } +} + +func testOldChunkDiskMapper(t *testing.T) *OldChunkDiskMapper { + tmpdir, err := ioutil.TempDir("", "data") + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(tmpdir)) + }) + + hrw, err := NewOldChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize) + require.NoError(t, err) + require.False(t, hrw.fileMaxtSet) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) + require.True(t, hrw.fileMaxtSet) + return hrw +} + +func createChunkForOld(t *testing.T, idx int, hrw *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { + seriesRef = HeadSeriesRef(rand.Int63()) + mint = int64((idx)*1000 + 1) + maxt = int64((idx + 1) * 1000) + chunk = randomChunk(t) + chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(err error) { + require.NoError(t, err) + }) + return +} diff --git a/tsdb/db.go b/tsdb/db.go index 6451755a4f..e9e6029888 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -171,6 +171,9 @@ type Options struct { // SeriesHashCache specifies the series hash cache used when querying shards via Querier.Select(). // If nil, the cache won't be used. SeriesHashCache *hashcache.SeriesHashCache + + // Temporary flag which we use to select whether we want to use the new or the old chunk disk mapper. + NewChunkDiskMapper bool } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -734,6 +737,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.EnableExemplarStorage = opts.EnableExemplarStorage headOpts.MaxExemplars.Store(opts.MaxExemplars) headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown + headOpts.NewChunkDiskMapper = opts.NewChunkDiskMapper if opts.IsolationDisabled { // We only override this flag if isolation is disabled at DB level. We use the default otherwise. headOpts.IsolationDisabled = opts.IsolationDisabled diff --git a/tsdb/head.go b/tsdb/head.go index 87824a5811..af5e2f50ab 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -57,6 +57,20 @@ var ( defaultIsolationDisabled = false ) +// chunkDiskMapper is a temporary interface while we transition from +// 0 size queue to queue based chunk disk mapper. +type chunkDiskMapper interface { + CutNewFile() (returnErr error) + IterateAllChunks(f func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) + Truncate(mint int64) error + DeleteCorrupted(originalErr error) error + Size() (int64, error) + Close() error + Chunk(ref chunks.ChunkDiskMapperRef) (chunkenc.Chunk, error) + WriteChunk(seriesRef chunks.HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef chunks.ChunkDiskMapperRef) + IsQueueEmpty() bool +} + // Head handles reads and writes of time series data within a time window. type Head struct { chunkRange atomic.Int64 @@ -97,7 +111,7 @@ type Head struct { lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. // chunkDiskMapper is used to write and read Head chunks to/from disk. - chunkDiskMapper *chunks.ChunkDiskMapper + chunkDiskMapper chunkDiskMapper chunkSnapshotMtx sync.Mutex @@ -140,6 +154,9 @@ type HeadOptions struct { EnableMemorySnapshotOnShutdown bool IsolationDisabled bool + + // Temporary flag which we use to select whether we want to use the new or the old chunk disk mapper. + NewChunkDiskMapper bool } func DefaultHeadOptions() *HeadOptions { @@ -153,6 +170,7 @@ func DefaultHeadOptions() *HeadOptions { StripeSize: DefaultStripeSize, SeriesCallback: &noopSeriesLifecycleCallback{}, IsolationDisabled: defaultIsolationDisabled, + NewChunkDiskMapper: false, } } @@ -215,13 +233,21 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti opts.ChunkPool = chunkenc.NewPool() } - h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( - r, - mmappedChunksDir(opts.ChunkDirRoot), - opts.ChunkPool, - opts.ChunkWriteBufferSize, - opts.ChunkWriteQueueSize, - ) + if opts.NewChunkDiskMapper { + h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( + r, + mmappedChunksDir(opts.ChunkDirRoot), + opts.ChunkPool, + opts.ChunkWriteBufferSize, + opts.ChunkWriteQueueSize, + ) + } else { + h.chunkDiskMapper, err = chunks.NewOldChunkDiskMapper( + mmappedChunksDir(opts.ChunkDirRoot), + opts.ChunkPool, + opts.ChunkWriteBufferSize, + ) + } if err != nil { return nil, err } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index efef155603..3232ed61e5 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -489,7 +489,7 @@ func (a *headAppender) Commit() (err error) { // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) { +func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper chunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) { // Based on Gorilla white papers this offers near-optimal compression ratio // so anything bigger that this has diminishing returns and increases // the time range within which we have to decompress all samples. @@ -587,7 +587,7 @@ func addJitterToChunkEndTime(seriesHash uint64, chunkMinTime, nextAt, maxNextAt return min(maxNextAt, nextAt+chunkDurationVariance-(chunkDurationMaxVariance/2)) } -func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { +func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper chunkDiskMapper) *memChunk { s.mmapCurrentHeadChunk(chunkDiskMapper) s.headChunk = &memChunk{ @@ -608,7 +608,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDis return s.headChunk } -func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) { +func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper chunkDiskMapper) { if s.headChunk == nil { // There is no head chunk, so nothing to m-map here. return diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 2d37f2a7e9..bf1d163283 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -329,7 +329,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) { // chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk. // If garbageCollect is true, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. -func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { +func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper chunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix @@ -363,7 +363,7 @@ type safeChunk struct { s *memSeries cid chunks.HeadChunkID isoState *isolationState - chunkDiskMapper *chunks.ChunkDiskMapper + chunkDiskMapper chunkDiskMapper } func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { @@ -375,7 +375,7 @@ func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { // iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range. // It is unsafe to call this concurrently with s.append(...) without holding the series lock. -func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { +func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper chunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { c, garbageCollect, err := s.chunk(id, chunkDiskMapper) // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a // series's chunk, which got then garbage collected before it got diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 0e9c9c0280..997eec2a67 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1634,7 +1634,7 @@ func TestHeadReadWriterRepair(t *testing.T) { _, ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") - h.chunkDiskMapper.CutNewFile() + require.NoError(t, h.chunkDiskMapper.CutNewFile()) } require.NoError(t, h.Close())