Remove old chunk mapper. (#311)

* Remove old chunk mapper.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Revert to code from upstream.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Fix OOO test failures

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix sorting in TestOOOHeadChunkReader_LabelValues

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Peter Štibraný 2022-08-18 10:38:17 +02:00 committed by GitHub
parent e9456018fa
commit acd4a8a69d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 37 additions and 1240 deletions

View file

@ -176,7 +176,7 @@ func TestChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T)
require.NoError(t, hrw.Close())
}()
ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw, nil)
ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw)
// Checking on-disk bytes for the first file.
require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles))
@ -554,24 +554,17 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer
return
}
func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper, hrwOld *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) {
func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) {
var err error
seriesRef = HeadSeriesRef(rand.Int63())
mint = int64((idx)*1000 + 1)
maxt = int64((idx + 1) * 1000)
chunk = randomUnsupportedChunk(t)
awaitCb := make(chan struct{})
if hrw != nil {
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) {
require.NoError(t, err)
close(awaitCb)
})
} else {
chunkRef = hrwOld.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) {
require.NoError(t, err)
})
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) {
require.NoError(t, err)
close(awaitCb)
}
})
<-awaitCb
return
}

View file

@ -1,713 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"bufio"
"bytes"
"encoding/binary"
"hash"
"io"
"os"
"sort"
"sync"
"github.com/pkg/errors"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
)
// OldChunkDiskMapper is for writing the Head block chunks to the disk
// and access chunks via mmapped file.
type OldChunkDiskMapper struct {
curFileNumBytes atomic.Int64 // Bytes written in current open file.
/// Writer.
dir *os.File
writeBufferSize int
curFile *os.File // File being written to.
curFileSequence int // Index of current open file being appended to.
curFileMaxt int64 // Used for the size retention.
byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk.
chkWriter *bufio.Writer // Writer for the current open file.
crc32 hash.Hash
writePathMtx sync.Mutex
/// Reader.
// The int key in the map is the file number on the disk.
mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index.
closers map[int]io.Closer // Closers for resources behind the byte slices.
readPathMtx sync.RWMutex // Mutex used to protect the above 2 maps.
pool chunkenc.Pool // This is used when fetching a chunk from the disk to allocate a chunk.
// Writer and Reader.
// We flush chunks to disk in batches. Hence, we store them in this buffer
// from which chunks are served till they are flushed and are ready for m-mapping.
chunkBuffer *chunkBuffer
// Whether the maxt field is set for all mmapped chunk files tracked within the mmappedChunkFiles map.
// This is done after iterating through all the chunks in those files using the IterateAllChunks method.
fileMaxtSet bool
closed bool
}
// NewOldChunkDiskMapper returns a new ChunkDiskMapper against the given directory
// using the default head chunk file duration.
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func NewOldChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*OldChunkDiskMapper, error) {
// Validate write buffer size.
if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize)
}
if writeBufferSize%1024 != 0 {
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize)
}
if err := os.MkdirAll(dir, 0o777); err != nil {
return nil, err
}
dirFile, err := fileutil.OpenDir(dir)
if err != nil {
return nil, err
}
m := &OldChunkDiskMapper{
dir: dirFile,
pool: pool,
writeBufferSize: writeBufferSize,
crc32: newCRC32(),
chunkBuffer: newChunkBuffer(),
}
if m.pool == nil {
m.pool = chunkenc.NewPool()
}
return m, m.openMMapFiles()
}
// openMMapFiles opens all files within dir for mmapping.
func (cdm *OldChunkDiskMapper) openMMapFiles() (returnErr error) {
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
cdm.closers = map[int]io.Closer{}
defer func() {
if returnErr != nil {
returnErr = tsdb_errors.NewMulti(returnErr, closeAllFromMap(cdm.closers)).Err()
cdm.mmappedChunkFiles = nil
cdm.closers = nil
}
}()
files, err := listChunkFiles(cdm.dir.Name())
if err != nil {
return err
}
files, err = repairLastChunkFile(files)
if err != nil {
return err
}
chkFileIndices := make([]int, 0, len(files))
for seq, fn := range files {
f, err := fileutil.OpenMmapFile(fn)
if err != nil {
return errors.Wrapf(err, "mmap files, file: %s", fn)
}
cdm.closers[seq] = f
cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())}
chkFileIndices = append(chkFileIndices, seq)
}
// Check for gaps in the files.
sort.Ints(chkFileIndices)
if len(chkFileIndices) == 0 {
return nil
}
lastSeq := chkFileIndices[0]
for _, seq := range chkFileIndices[1:] {
if seq != lastSeq+1 {
return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq)
}
lastSeq = seq
}
for i, b := range cdm.mmappedChunkFiles {
if b.byteSlice.Len() < HeadChunkFileHeaderSize {
return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i])
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks {
return errors.Errorf("%s: invalid magic number %x", files[i], m)
}
// Verify chunk format version.
if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
return errors.Errorf("%s: invalid chunk format version %d", files[i], v)
}
}
return nil
}
// WriteChunk writes the chunk to the disk.
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
func (cdm *OldChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
chkRef, err := func() (ChunkDiskMapperRef, error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
if cdm.closed {
return 0, ErrChunkDiskMapperClosed
}
if cdm.shouldCutNewFile(len(chk.Bytes())) {
if err := cdm.cut(); err != nil {
return 0, err
}
}
// if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size;
// so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer).
if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) {
if err := cdm.flushBuffer(); err != nil {
return 0, err
}
}
cdm.crc32.Reset()
bytesWritten := 0
chkRef = newChunkDiskMapperRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize()))
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(seriesRef))
bytesWritten += SeriesRefSize
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint))
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
cdm.byteBuf[bytesWritten] = byte(chk.Encoding())
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes())))
bytesWritten += n
if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil {
return 0, err
}
if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil {
return 0, err
}
if err := cdm.writeCRC32(); err != nil {
return 0, err
}
if maxt > cdm.curFileMaxt {
cdm.curFileMaxt = maxt
}
cdm.chunkBuffer.put(chkRef, chk)
if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize {
// The chunk was bigger than the buffer itself.
// Flushing to not keep partial chunks in buffer.
if err := cdm.flushBuffer(); err != nil {
return 0, err
}
}
return chkRef, nil
}()
if err != nil && callback != nil {
callback(err)
}
return chkRef
}
// shouldCutNewFile returns whether a new file should be cut, based on time and size retention.
// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map.
// Time retention: so that we can delete old chunks with some time guarantee in low load environments.
func (cdm *OldChunkDiskMapper) shouldCutNewFile(chunkSize int) bool {
return cdm.curFileSize() == 0 || // First head chunk file.
cdm.curFileSize()+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
}
// CutNewFile creates a new m-mapped file.
func (cdm *OldChunkDiskMapper) CutNewFile() (returnErr error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
return cdm.cut()
}
// cut creates a new m-mapped file. The write lock should be held before calling this.
func (cdm *OldChunkDiskMapper) cut() (returnErr error) {
// Sync current tail to disk and close.
if err := cdm.finalizeCurFile(); err != nil {
return err
}
n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize)
if err != nil {
return err
}
defer func() {
// The file should not be closed if there is no error,
// its kept open in the ChunkDiskMapper.
if returnErr != nil {
returnErr = tsdb_errors.NewMulti(returnErr, newFile.Close()).Err()
}
}()
cdm.curFileNumBytes.Store(int64(n))
if cdm.curFile != nil {
cdm.readPathMtx.Lock()
cdm.mmappedChunkFiles[cdm.curFileSequence].maxt = cdm.curFileMaxt
cdm.readPathMtx.Unlock()
}
mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize)
if err != nil {
return err
}
cdm.readPathMtx.Lock()
cdm.curFileSequence = seq
cdm.curFile = newFile
if cdm.chkWriter != nil {
cdm.chkWriter.Reset(newFile)
} else {
cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize)
}
cdm.closers[cdm.curFileSequence] = mmapFile
cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())}
cdm.readPathMtx.Unlock()
cdm.curFileMaxt = 0
return nil
}
// finalizeCurFile writes all pending data to the current tail file,
// truncates its size, and closes it.
func (cdm *OldChunkDiskMapper) finalizeCurFile() error {
if cdm.curFile == nil {
return nil
}
if err := cdm.flushBuffer(); err != nil {
return err
}
if err := cdm.curFile.Sync(); err != nil {
return err
}
return cdm.curFile.Close()
}
func (cdm *OldChunkDiskMapper) write(b []byte) error {
n, err := cdm.chkWriter.Write(b)
cdm.curFileNumBytes.Add(int64(n))
return err
}
func (cdm *OldChunkDiskMapper) writeAndAppendToCRC32(b []byte) error {
if err := cdm.write(b); err != nil {
return err
}
_, err := cdm.crc32.Write(b)
return err
}
func (cdm *OldChunkDiskMapper) writeCRC32() error {
return cdm.write(cdm.crc32.Sum(cdm.byteBuf[:0]))
}
// flushBuffer flushes the current in-memory chunks.
// Assumes that writePathMtx is _write_ locked before calling this method.
func (cdm *OldChunkDiskMapper) flushBuffer() error {
if err := cdm.chkWriter.Flush(); err != nil {
return err
}
cdm.chunkBuffer.clear()
return nil
}
// Chunk returns a chunk from a given reference.
func (cdm *OldChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) {
cdm.readPathMtx.RLock()
// We hold this read lock for the entire duration because if Close()
// is called, the data in the byte slice will get corrupted as the mmapped
// file will be closed.
defer cdm.readPathMtx.RUnlock()
if cdm.closed {
return nil, ErrChunkDiskMapperClosed
}
sgmIndex, chkStart := ref.Unpack()
// We skip the series ref and the mint/maxt beforehand.
chkStart += SeriesRefSize + (2 * MintMaxtSize)
chkCRC32 := newCRC32()
// If it is the current open file, then the chunks can be in the buffer too.
if sgmIndex == cdm.curFileSequence {
chunk := cdm.chunkBuffer.get(ref)
if chunk != nil {
return chunk, nil
}
}
mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex]
if !ok {
if sgmIndex > cdm.curFileSequence {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: -1,
Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex),
}
}
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.New("head chunk file index %d does not exist on disk"),
}
}
if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()),
}
}
// Encoding.
chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0]
// Data length.
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
chkDataLenStart := chkStart + ChunkEncodingSize
c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize)
chkDataLen, n := binary.Uvarint(c)
if n <= 0 {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("reading chunk length failed with %d", n),
}
}
// Verify the chunk data end.
chkDataEnd := chkDataLenStart + n + int(chkDataLen)
if chkDataEnd > mmapFile.byteSlice.Len() {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()),
}
}
// Check the CRC.
sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: err,
}
}
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
}
}
// The chunk data itself.
chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd)
// Make a copy of the chunk data to prevent a panic occurring because the returned
// chunk data slice references an mmap-ed file which could be closed after the
// function returns but while the chunk is still in use.
chkDataCopy := make([]byte, len(chkData))
copy(chkDataCopy, chkData)
chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkDataCopy)
if err != nil {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: err,
}
}
return chk, nil
}
// IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it
// and runs the provided function with information about each chunk. It returns on the first error encountered.
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
defer func() {
cdm.fileMaxtSet = true
}()
chkCRC32 := newCRC32()
// Iterate files in ascending order.
segIDs := make([]int, 0, len(cdm.mmappedChunkFiles))
for seg := range cdm.mmappedChunkFiles {
segIDs = append(segIDs, seg)
}
sort.Ints(segIDs)
for _, segID := range segIDs {
mmapFile := cdm.mmappedChunkFiles[segID]
fileEnd := mmapFile.byteSlice.Len()
if segID == cdm.curFileSequence {
fileEnd = int(cdm.curFileSize())
}
idx := HeadChunkFileHeaderSize
for idx < fileEnd {
if fileEnd-idx < MaxHeadChunkMetaSize {
// Check for all 0s which marks the end of the file.
allZeros := true
for _, b := range mmapFile.byteSlice.Range(idx, fileEnd) {
if b != byte(0) {
allZeros = false
break
}
}
if allZeros {
// End of segment chunk file content.
break
}
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+
" - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID),
}
}
chkCRC32.Reset()
chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx))
startIdx := idx
seriesRef := HeadSeriesRef(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize)))
idx += SeriesRefSize
mint := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize)))
idx += MintMaxtSize
maxt := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize)))
idx += MintMaxtSize
// We preallocate file to help with m-mapping (especially windows systems).
// As series ref always starts from 1, we assume it being 0 to be the end of the actual file data.
// We are not considering possible file corruption that can cause it to be 0.
// Additionally we are checking mint and maxt just to be sure.
if seriesRef == 0 && mint == 0 && maxt == 0 {
break
}
// Encoding.
chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0])
idx += ChunkEncodingSize
dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize))
idx += n
numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2))
idx += int(dataLen) // Skip the data.
// In the beginning we only checked for the chunk meta size.
// Now that we have added the chunk data length, we check for sufficient bytes again.
if idx+CRCSize > fileEnd {
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID),
}
}
// Check CRC.
sum := mmapFile.byteSlice.Range(idx, idx+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil {
return err
}
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
}
}
idx += CRCSize
if maxt > mmapFile.maxt {
mmapFile.maxt = maxt
}
if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil {
if cerr, ok := err.(*CorruptionErr); ok {
cerr.Dir = cdm.dir.Name()
cerr.FileIndex = segID
return cerr
}
return err
}
}
if idx > fileEnd {
// It should be equal to the slice length.
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID),
}
}
}
return nil
}
// Truncate deletes the head chunk files whose file number is less than given fileNo.
func (cdm *OldChunkDiskMapper) Truncate(fileNo int) error {
if !cdm.fileMaxtSet {
return errors.New("maxt of the files are not set")
}
cdm.readPathMtx.RLock()
// Sort the file indices, else if files deletion fails in between,
// it can lead to unsequential files as the map is not sorted.
chkFileIndices := make([]int, 0, len(cdm.mmappedChunkFiles))
for seq := range cdm.mmappedChunkFiles {
chkFileIndices = append(chkFileIndices, seq)
}
sort.Ints(chkFileIndices)
var removedFiles []int
for _, seq := range chkFileIndices {
if seq == cdm.curFileSequence || seq >= fileNo {
break
}
removedFiles = append(removedFiles, seq)
}
cdm.readPathMtx.RUnlock()
errs := tsdb_errors.NewMulti()
// Cut a new file only if the current file has some chunks.
if cdm.curFileSize() > HeadChunkFileHeaderSize {
errs.Add(cdm.CutNewFile())
}
errs.Add(cdm.deleteFiles(removedFiles))
return errs.Err()
}
func (cdm *OldChunkDiskMapper) deleteFiles(removedFiles []int) error {
cdm.readPathMtx.Lock()
for _, seq := range removedFiles {
if err := cdm.closers[seq].Close(); err != nil {
cdm.readPathMtx.Unlock()
return err
}
delete(cdm.mmappedChunkFiles, seq)
delete(cdm.closers, seq)
}
cdm.readPathMtx.Unlock()
// We actually delete the files separately to not block the readPathMtx for long.
for _, seq := range removedFiles {
if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil {
return err
}
}
return nil
}
// DeleteCorrupted deletes all the head chunk files after the one which had the corruption
// (including the corrupt file).
func (cdm *OldChunkDiskMapper) DeleteCorrupted(originalErr error) error {
err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped.
cerr, ok := err.(*CorruptionErr)
if !ok {
return errors.Wrap(originalErr, "cannot handle error")
}
// Delete all the head chunk files following the corrupt head chunk file.
segs := []int{}
cdm.readPathMtx.RLock()
for seg := range cdm.mmappedChunkFiles {
if seg >= cerr.FileIndex {
segs = append(segs, seg)
}
}
cdm.readPathMtx.RUnlock()
return cdm.deleteFiles(segs)
}
// Size returns the size of the chunk files.
func (cdm *OldChunkDiskMapper) Size() (int64, error) {
return fileutil.DirSize(cdm.dir.Name())
}
func (cdm *OldChunkDiskMapper) curFileSize() int64 {
return cdm.curFileNumBytes.Load()
}
// Close closes all the open files in ChunkDiskMapper.
// It is not longer safe to access chunks from this struct after calling Close.
func (cdm *OldChunkDiskMapper) Close() error {
// 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file.
// The lock order should not be reversed here else it can cause deadlocks.
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
cdm.readPathMtx.Lock()
defer cdm.readPathMtx.Unlock()
if cdm.closed {
return nil
}
cdm.closed = true
errs := tsdb_errors.NewMulti(
closeAllFromMap(cdm.closers),
cdm.finalizeCurFile(),
cdm.dir.Close(),
)
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
cdm.closers = map[int]io.Closer{}
return errs.Err()
}
func (cdm *OldChunkDiskMapper) IsQueueEmpty() bool {
return true // there is no queue
}

View file

@ -1,484 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"encoding/binary"
"errors"
"math/rand"
"os"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
expectedBytes := []byte{}
nextChunkOffset := uint64(HeadChunkFileHeaderSize)
chkCRC32 := newCRC32()
type expectedDataType struct {
seriesRef HeadSeriesRef
chunkRef ChunkDiskMapperRef
mint, maxt int64
numSamples uint16
chunk chunkenc.Chunk
isOOO bool
}
expectedData := []expectedDataType{}
var buf [MaxHeadChunkMetaSize]byte
totalChunks := 0
var firstFileName string
for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 {
addChunks := func(numChunks int) {
for i := 0; i < numChunks; i++ {
seriesRef, chkRef, mint, maxt, chunk, isOOO := createChunkForOld(t, totalChunks, hrw)
totalChunks++
expectedData = append(expectedData, expectedDataType{
seriesRef: seriesRef,
mint: mint,
maxt: maxt,
chunkRef: chkRef,
chunk: chunk,
numSamples: uint16(chunk.NumSamples()),
isOOO: isOOO,
})
if hrw.curFileSequence != 1 {
// We are checking for bytes written only for the first file.
continue
}
// Calculating expected bytes written on disk for first file.
firstFileName = hrw.curFile.Name()
require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef)
bytesWritten := 0
chkCRC32.Reset()
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef))
bytesWritten += SeriesRefSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint))
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
buf[bytesWritten] = byte(chunk.Encoding())
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
bytesWritten += n
expectedBytes = append(expectedBytes, buf[:bytesWritten]...)
_, err := chkCRC32.Write(buf[:bytesWritten])
require.NoError(t, err)
expectedBytes = append(expectedBytes, chunk.Bytes()...)
_, err = chkCRC32.Write(chunk.Bytes())
require.NoError(t, err)
expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...)
// += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC.
nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize
}
}
addChunks(100)
hrw.CutNewFile()
addChunks(10) // For chunks in in-memory buffer.
}
// Checking on-disk bytes for the first file.
require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles))
require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers))
actualBytes, err := os.ReadFile(firstFileName)
require.NoError(t, err)
// Check header of the segment file.
require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize])))
require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize]))
// Remaining chunk data.
fileEnd := HeadChunkFileHeaderSize + len(expectedBytes)
require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd])
// Testing reading of chunks.
for _, exp := range expectedData {
actChunk, err := hrw.Chunk(exp.chunkRef)
require.NoError(t, err)
require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes())
}
// Testing IterateAllChunks method.
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
idx := 0
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error {
t.Helper()
expData := expectedData[idx]
require.Equal(t, expData.seriesRef, seriesRef)
require.Equal(t, expData.chunkRef, chunkRef)
require.Equal(t, expData.maxt, maxt)
require.Equal(t, expData.maxt, maxt)
require.Equal(t, expData.numSamples, numSamples)
require.Equal(t, expData.isOOO, chunkenc.IsOutOfOrderChunk(encoding))
actChunk, err := hrw.Chunk(expData.chunkRef)
require.NoError(t, err)
require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes())
idx++
return nil
}))
require.Equal(t, len(expectedData), idx)
}
func TestOldChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, nil, hrw)
// Checking on-disk bytes for the first file.
require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles))
require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers))
// Testing IterateAllChunks method.
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error {
t.Helper()
require.Equal(t, ucSeriesRef, seriesRef)
require.Equal(t, ucChkRef, chunkRef)
require.Equal(t, ucMint, mint)
require.Equal(t, ucMaxt, maxt)
require.Equal(t, uchunk.Encoding(), encoding) // Asserts that the encoding is EncUnsupportedXOR
actChunk, err := hrw.Chunk(chunkRef)
// The chunk encoding is unknown so Chunk() should fail but us the caller
// are ok with that. Above we asserted that the encoding we expected was
// EncUnsupportedXOR
require.NotNil(t, err)
require.Contains(t, err.Error(), "invalid chunk encoding \"<unknown>\"")
require.Nil(t, actChunk)
return nil
}))
}
// TestOldChunkDiskMapper_Truncate tests
// * If truncation is happening properly based on the time passed.
// * The active file is not deleted even if the passed time makes it eligible to be deleted.
// * Empty current file does not lead to creation of another file after truncation.
// * Non-empty current file leads to creation of another file after truncation.
func TestOldChunkDiskMapper_Truncate(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
fileTimeStep := 100
addChunk := func() {
mint := timeRange + 1 // Just after the new file cut.
maxt := timeRange + fileTimeStep - 1 // Just before the next file.
// Write a chunks to set maxt for the segment.
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
})
timeRange += fileTimeStep
}
verifyFiles := func(remainingFiles []int) {
t.Helper()
files, err := os.ReadDir(hrw.dir.Name())
require.NoError(t, err)
require.Equal(t, len(remainingFiles), len(files), "files on disk")
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
for _, i := range remainingFiles {
_, ok := hrw.mmappedChunkFiles[i]
require.Equal(t, true, ok)
}
}
// Create segments 1 to 7.
for i := 1; i <= 7; i++ {
require.NoError(t, hrw.CutNewFile())
addChunk()
}
verifyFiles([]int{1, 2, 3, 4, 5, 6, 7})
// Truncating files.
require.NoError(t, hrw.Truncate(3))
verifyFiles([]int{3, 4, 5, 6, 7, 8})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarted.
var err error
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
return nil
}))
require.True(t, hrw.fileMaxtSet)
verifyFiles([]int{3, 4, 5, 6, 7, 8})
// New file is created after restart even if last file was empty.
addChunk()
verifyFiles([]int{3, 4, 5, 6, 7, 8, 9})
// Truncating files after restart.
require.NoError(t, hrw.Truncate(6))
verifyFiles([]int{6, 7, 8, 9, 10})
// As the last file was empty, this creates no new files.
require.NoError(t, hrw.Truncate(6))
verifyFiles([]int{6, 7, 8, 9, 10})
require.NoError(t, hrw.Truncate(8))
verifyFiles([]int{8, 9, 10})
addChunk()
// Truncating till current time should not delete the current active file.
require.NoError(t, hrw.Truncate(10))
verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created.
}
// TestOldChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke
// holes into the file sequence, even if there are empty files in between non-empty files.
// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation
// simply deleted all empty files instead of stopping once it encountered a non-empty file.
func TestOldChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
addChunk := func() {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
})
timeRange += step
}
emptyFile := func() {
require.NoError(t, hrw.CutNewFile())
}
nonEmptyFile := func() {
emptyFile()
addChunk()
}
addChunk() // 1. Created with the first chunk.
nonEmptyFile() // 2.
nonEmptyFile() // 3.
emptyFile() // 4.
nonEmptyFile() // 5.
emptyFile() // 6.
verifyFiles := func(remainingFiles []int) {
t.Helper()
files, err := os.ReadDir(hrw.dir.Name())
require.NoError(t, err)
require.Equal(t, len(remainingFiles), len(files), "files on disk")
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
for _, i := range remainingFiles {
_, ok := hrw.mmappedChunkFiles[i]
require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i)
}
}
verifyFiles([]int{1, 2, 3, 4, 5, 6})
// Truncating files till 2. It should not delete anything after 3 (inclusive)
// though files 4 and 6 are empty.
require.NoError(t, hrw.Truncate(3))
// As 6 was empty, it should not create another file.
verifyFiles([]int{3, 4, 5, 6})
addChunk()
// Truncate creates another file as 6 is not empty now.
require.NoError(t, hrw.Truncate(3))
verifyFiles([]int{3, 4, 5, 6, 7})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting checks for unsequential files.
var err error
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
verifyFiles([]int{3, 4, 5, 6, 7})
}
// TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks tests for
// https://github.com/prometheus/prometheus/issues/7753
func TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
// Write a chunks to iterate on it later.
_ = hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(err error) {
require.NoError(t, err)
})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
// Forcefully failing IterateAllChunks.
require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
return errors.New("random error")
}))
// Truncation call should not return error after IterateAllChunks fails.
require.NoError(t, hrw.Truncate(2000))
}
func TestOldChunkDiskMapper_ReadRepairOnEmptyLastFile(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
addChunk := func() {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
})
timeRange += step
}
nonEmptyFile := func() {
require.NoError(t, hrw.CutNewFile())
addChunk()
}
addChunk() // 1. Created with the first chunk.
nonEmptyFile() // 2.
nonEmptyFile() // 3.
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
lastFile := 0
for idx := range hrw.mmappedChunkFiles {
if idx > lastFile {
lastFile = idx
}
}
require.Equal(t, 3, lastFile)
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Write an empty last file mimicking an abrupt shutdown on file creation.
emptyFileName := segmentFile(dir, lastFile+1)
f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0o666)
require.NoError(t, err)
require.NoError(t, f.Sync())
stat, err := f.Stat()
require.NoError(t, err)
require.Equal(t, int64(0), stat.Size())
require.NoError(t, f.Close())
// Open chunk disk mapper again, corrupt file should be removed.
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
return nil
}))
require.True(t, hrw.fileMaxtSet)
// Removed from memory.
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
for idx := range hrw.mmappedChunkFiles {
require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file")
}
// Removed even from disk.
files, err := os.ReadDir(dir)
require.NoError(t, err)
require.Equal(t, 3, len(files))
for _, fi := range files {
seq, err := strconv.ParseUint(fi.Name(), 10, 64)
require.NoError(t, err)
require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file")
}
}
func testOldChunkDiskMapper(t *testing.T) *OldChunkDiskMapper {
tmpdir := t.TempDir()
hrw, err := NewOldChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
return nil
}))
require.True(t, hrw.fileMaxtSet)
return hrw
}
func createChunkForOld(t *testing.T, idx int, hrw *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk, isOOO bool) {
seriesRef = HeadSeriesRef(rand.Int63())
mint = int64((idx)*1000 + 1)
maxt = int64((idx + 1) * 1000)
chunk = randomChunk(t)
if rand.Intn(2) == 0 {
isOOO = true
chunk = &chunkenc.OOOXORChunk{XORChunk: chunk.(*chunkenc.XORChunk)}
}
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(err error) {
require.NoError(t, err)
})
return
}

View file

@ -815,7 +815,6 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
headOpts.OutOfOrderCapMin.Store(opts.OutOfOrderCapMin)
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
headOpts.NewChunkDiskMapper = opts.NewChunkDiskMapper
if opts.IsolationDisabled {
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
headOpts.IsolationDisabled = opts.IsolationDisabled

View file

@ -3861,11 +3861,21 @@ func TestOOOCompaction(t *testing.T) {
verifySamples(db.Blocks()[1], 120, 239)
verifySamples(db.Blocks()[2], 240, 310)
// Because of OOO compaction, we already have 2 m-map files.
// Because of OOO compaction, the current mmap file will end.
// All the chunks are only in the first file.
// Add a dummy mmap chunk to create a new mmap file.
mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot)
files, err = os.ReadDir(mmapDir)
require.NoError(t, err)
require.Len(t, files, 1)
waitC := make(chan struct{})
db.head.chunkDiskMapper.WriteChunk(100, 0, 0, chunkenc.NewXORChunk(), func(err error) {
require.NoError(t, err)
close(waitC)
})
<-waitC
files, err = os.ReadDir(mmapDir)
require.NoError(t, err)
require.Len(t, files, 2)
// Compact the in-order head and expect another block.
@ -4652,8 +4662,16 @@ func TestOOOCompactionFailure(t *testing.T) {
require.Equal(t, len(db.Blocks()), 3)
require.Equal(t, oldBlocks, db.Blocks())
// Because of OOO compaction, we have a second m-map file now
// Because of OOO compaction, the current mmap file will end.
// All the chunks are only in the first file.
// Add a dummy mmap chunk to create a new mmap file.
verifyMmapFiles("000001")
waitC := make(chan struct{})
db.head.chunkDiskMapper.WriteChunk(100, 0, 0, chunkenc.NewXORChunk(), func(err error) {
require.NoError(t, err)
close(waitC)
})
<-waitC
verifyMmapFiles("000001", "000002")
// All but last WBL file will be deleted.

View file

@ -163,10 +163,6 @@ type HeadOptions struct {
EnableMemorySnapshotOnShutdown bool
IsolationDisabled bool
// Temporary flag which we use to select whether to use the new (used in upstream
// Prometheus) or the old (legacy) chunk disk mapper.
NewChunkDiskMapper bool
}
const (
@ -185,7 +181,6 @@ func DefaultHeadOptions() *HeadOptions {
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled,
NewChunkDiskMapper: false,
}
ho.OutOfOrderCapMin.Store(DefaultOutOfOrderCapMin)
ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax)
@ -273,21 +268,13 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wal.WAL, opts *Hea
opts.ChunkPool = chunkenc.NewPool()
}
if opts.NewChunkDiskMapper {
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(
r,
mmappedChunksDir(opts.ChunkDirRoot),
opts.ChunkPool,
opts.ChunkWriteBufferSize,
opts.ChunkWriteQueueSize,
)
} else {
h.chunkDiskMapper, err = chunks.NewOldChunkDiskMapper(
mmappedChunksDir(opts.ChunkDirRoot),
opts.ChunkPool,
opts.ChunkWriteBufferSize,
)
}
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(
r,
mmappedChunksDir(opts.ChunkDirRoot),
opts.ChunkPool,
opts.ChunkWriteBufferSize,
opts.ChunkWriteQueueSize,
)
if err != nil {
return nil, err
}

View file

@ -1691,14 +1691,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
// take effect without another chunk being written.
files, err := os.ReadDir(mmappedChunksDir(dir))
require.NoError(t, err)
// With the new chunk disk mapper we only expect 6 files, because the last call to "CutNewFile()" won't
// take effect until the next chunk is being written.
if opts.NewChunkDiskMapper {
require.Equal(t, 6, len(files))
} else {
require.Equal(t, 7, len(files))
}
require.Equal(t, 6, len(files))
// Corrupt the 4th file by writing a random byte to series ref.
f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0o666)

View file

@ -432,20 +432,24 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
values, err := oh.LabelValues("foo", matchers...)
require.NoError(t, err)
sort.Strings(values)
require.Equal(t, tc.expValues1, values)
matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")}
values, err = oh.LabelValues("foo", matchers...)
require.NoError(t, err)
sort.Strings(values)
require.Equal(t, tc.expValues2, values)
matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")}
values, err = oh.LabelValues("foo", matchers...)
require.NoError(t, err)
sort.Strings(values)
require.Equal(t, tc.expValues3, values)
values, err = oh.LabelValues("foo")
require.NoError(t, err)
sort.Strings(values)
require.Equal(t, tc.expValues4, values)
})
}