remove old chunk disk mapper initialization and replace it with the upstream one

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>
This commit is contained in:
Mauro Stettler 2022-04-18 17:35:45 +00:00
parent ee2f94b752
commit bd50b04fed
No known key found for this signature in database
GPG key ID: 1EBC1C55E2D201A2
8 changed files with 28 additions and 1260 deletions

View file

@ -484,12 +484,11 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64
} }
// CutNewFile makes that a new file will be created the next time a chunk is written. // CutNewFile makes that a new file will be created the next time a chunk is written.
func (cdm *ChunkDiskMapper) CutNewFile() error { func (cdm *ChunkDiskMapper) CutNewFile() {
cdm.evtlPosMtx.Lock() cdm.evtlPosMtx.Lock()
defer cdm.evtlPosMtx.Unlock() defer cdm.evtlPosMtx.Unlock()
cdm.evtlPos.cutFileOnNextChunk() cdm.evtlPos.cutFileOnNextChunk()
return nil
} }
func (cdm *ChunkDiskMapper) IsQueueEmpty() bool { func (cdm *ChunkDiskMapper) IsQueueEmpty() bool {
@ -800,6 +799,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
if seriesRef == 0 && mint == 0 && maxt == 0 { if seriesRef == 0 && mint == 0 && maxt == 0 {
break break
} }
// Encoding. // Encoding.
chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0]) chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0])
idx += ChunkEncodingSize idx += ChunkEncodingSize
@ -893,7 +893,7 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error {
// There is a known race condition here because between the check of curFileSize() and the call to CutNewFile() // There is a known race condition here because between the check of curFileSize() and the call to CutNewFile()
// a new file could already be cut, this is acceptable because it will simply result in an empty file which // a new file could already be cut, this is acceptable because it will simply result in an empty file which
// won't do any harm. // won't do any harm.
errs.Add(cdm.CutNewFile()) cdm.CutNewFile()
} }
pendingDeletes, err := cdm.deleteFiles(removedFiles) pendingDeletes, err := cdm.deleteFiles(removedFiles)
errs.Add(err) errs.Add(err)

View file

@ -116,7 +116,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
} }
} }
addChunks(100) addChunks(100)
require.NoError(t, hrw.CutNewFile()) hrw.CutNewFile()
addChunks(10) // For chunks in in-memory buffer. addChunks(10) // For chunks in in-memory buffer.
} }
@ -174,7 +174,7 @@ func TestChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T)
require.NoError(t, hrw.Close()) require.NoError(t, hrw.Close())
}() }()
ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw, nil) ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw)
// Checking on-disk bytes for the first file. // Checking on-disk bytes for the first file.
require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles))
@ -254,7 +254,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
// Create segments 1 to 7. // Create segments 1 to 7.
for i := 1; i <= 7; i++ { for i := 1; i <= 7; i++ {
require.NoError(t, hrw.CutNewFile()) hrw.CutNewFile()
mint := int64(addChunk()) mint := int64(addChunk())
if i == 3 { if i == 3 {
thirdFileMinT = mint thirdFileMinT = mint
@ -453,7 +453,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
nonEmptyFile := func() { nonEmptyFile := func() {
t.Helper() t.Helper()
require.NoError(t, hrw.CutNewFile()) hrw.CutNewFile()
addChunk() addChunk()
} }
@ -555,24 +555,17 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer
return return
} }
func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper, hrwOld *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) {
var err error var err error
seriesRef = HeadSeriesRef(rand.Int63()) seriesRef = HeadSeriesRef(rand.Int63())
mint = int64((idx)*1000 + 1) mint = int64((idx)*1000 + 1)
maxt = int64((idx + 1) * 1000) maxt = int64((idx + 1) * 1000)
chunk = randomUnsupportedChunk(t) chunk = randomUnsupportedChunk(t)
awaitCb := make(chan struct{}) awaitCb := make(chan struct{})
if hrw != nil { chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) {
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { require.NoError(t, err)
require.NoError(t, err)
close(awaitCb)
})
} else {
chunkRef = hrwOld.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) {
require.NoError(t, err)
})
close(awaitCb) close(awaitCb)
} })
<-awaitCb <-awaitCb
return return
} }

View file

@ -1,716 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"bufio"
"bytes"
"encoding/binary"
"hash"
"io"
"os"
"sort"
"sync"
"github.com/pkg/errors"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
)
// OldChunkDiskMapper is for writing the Head block chunks to the disk
// and access chunks via mmapped file.
type OldChunkDiskMapper struct {
curFileNumBytes atomic.Int64 // Bytes written in current open file.
/// Writer.
dir *os.File
writeBufferSize int
curFile *os.File // File being written to.
curFileSequence int // Index of current open file being appended to.
curFileMaxt int64 // Used for the size retention.
byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk.
chkWriter *bufio.Writer // Writer for the current open file.
crc32 hash.Hash
writePathMtx sync.Mutex
/// Reader.
// The int key in the map is the file number on the disk.
mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index.
closers map[int]io.Closer // Closers for resources behind the byte slices.
readPathMtx sync.RWMutex // Mutex used to protect the above 2 maps.
pool chunkenc.Pool // This is used when fetching a chunk from the disk to allocate a chunk.
// Writer and Reader.
// We flush chunks to disk in batches. Hence, we store them in this buffer
// from which chunks are served till they are flushed and are ready for m-mapping.
chunkBuffer *chunkBuffer
// Whether the maxt field is set for all mmapped chunk files tracked within the mmappedChunkFiles map.
// This is done after iterating through all the chunks in those files using the IterateAllChunks method.
fileMaxtSet bool
closed bool
}
// NewOldChunkDiskMapper returns a new ChunkDiskMapper against the given directory
// using the default head chunk file duration.
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func NewOldChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*OldChunkDiskMapper, error) {
// Validate write buffer size.
if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize)
}
if writeBufferSize%1024 != 0 {
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize)
}
if err := os.MkdirAll(dir, 0o777); err != nil {
return nil, err
}
dirFile, err := fileutil.OpenDir(dir)
if err != nil {
return nil, err
}
m := &OldChunkDiskMapper{
dir: dirFile,
pool: pool,
writeBufferSize: writeBufferSize,
crc32: newCRC32(),
chunkBuffer: newChunkBuffer(),
}
if m.pool == nil {
m.pool = chunkenc.NewPool()
}
return m, m.openMMapFiles()
}
// openMMapFiles opens all files within dir for mmapping.
func (cdm *OldChunkDiskMapper) openMMapFiles() (returnErr error) {
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
cdm.closers = map[int]io.Closer{}
defer func() {
if returnErr != nil {
returnErr = tsdb_errors.NewMulti(returnErr, closeAllFromMap(cdm.closers)).Err()
cdm.mmappedChunkFiles = nil
cdm.closers = nil
}
}()
files, err := listChunkFiles(cdm.dir.Name())
if err != nil {
return err
}
files, err = repairLastChunkFile(files)
if err != nil {
return err
}
chkFileIndices := make([]int, 0, len(files))
for seq, fn := range files {
f, err := fileutil.OpenMmapFile(fn)
if err != nil {
return errors.Wrapf(err, "mmap files, file: %s", fn)
}
cdm.closers[seq] = f
cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())}
chkFileIndices = append(chkFileIndices, seq)
}
// Check for gaps in the files.
sort.Ints(chkFileIndices)
if len(chkFileIndices) == 0 {
return nil
}
lastSeq := chkFileIndices[0]
for _, seq := range chkFileIndices[1:] {
if seq != lastSeq+1 {
return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq)
}
lastSeq = seq
}
for i, b := range cdm.mmappedChunkFiles {
if b.byteSlice.Len() < HeadChunkFileHeaderSize {
return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i])
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks {
return errors.Errorf("%s: invalid magic number %x", files[i], m)
}
// Verify chunk format version.
if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
return errors.Errorf("%s: invalid chunk format version %d", files[i], v)
}
}
return nil
}
// WriteChunk writes the chunk to the disk.
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
func (cdm *OldChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
chkRef, err := func() (ChunkDiskMapperRef, error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
if cdm.closed {
return 0, ErrChunkDiskMapperClosed
}
if cdm.shouldCutNewFile(len(chk.Bytes())) {
if err := cdm.cut(); err != nil {
return 0, err
}
}
// if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size;
// so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer).
if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) {
if err := cdm.flushBuffer(); err != nil {
return 0, err
}
}
cdm.crc32.Reset()
bytesWritten := 0
chkRef = newChunkDiskMapperRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize()))
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(seriesRef))
bytesWritten += SeriesRefSize
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint))
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
cdm.byteBuf[bytesWritten] = byte(chk.Encoding())
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes())))
bytesWritten += n
if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil {
return 0, err
}
if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil {
return 0, err
}
if err := cdm.writeCRC32(); err != nil {
return 0, err
}
if maxt > cdm.curFileMaxt {
cdm.curFileMaxt = maxt
}
cdm.chunkBuffer.put(chkRef, chk)
if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize {
// The chunk was bigger than the buffer itself.
// Flushing to not keep partial chunks in buffer.
if err := cdm.flushBuffer(); err != nil {
return 0, err
}
}
return chkRef, nil
}()
if err != nil && callback != nil {
callback(err)
}
return chkRef
}
// shouldCutNewFile returns whether a new file should be cut, based on time and size retention.
// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map.
// Time retention: so that we can delete old chunks with some time guarantee in low load environments.
func (cdm *OldChunkDiskMapper) shouldCutNewFile(chunkSize int) bool {
return cdm.curFileSize() == 0 || // First head chunk file.
cdm.curFileSize()+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
}
// CutNewFile creates a new m-mapped file.
func (cdm *OldChunkDiskMapper) CutNewFile() (returnErr error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
return cdm.cut()
}
// cut creates a new m-mapped file. The write lock should be held before calling this.
func (cdm *OldChunkDiskMapper) cut() (returnErr error) {
// Sync current tail to disk and close.
if err := cdm.finalizeCurFile(); err != nil {
return err
}
n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize)
if err != nil {
return err
}
defer func() {
// The file should not be closed if there is no error,
// its kept open in the ChunkDiskMapper.
if returnErr != nil {
returnErr = tsdb_errors.NewMulti(returnErr, newFile.Close()).Err()
}
}()
cdm.curFileNumBytes.Store(int64(n))
if cdm.curFile != nil {
cdm.readPathMtx.Lock()
cdm.mmappedChunkFiles[cdm.curFileSequence].maxt = cdm.curFileMaxt
cdm.readPathMtx.Unlock()
}
mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize)
if err != nil {
return err
}
cdm.readPathMtx.Lock()
cdm.curFileSequence = seq
cdm.curFile = newFile
if cdm.chkWriter != nil {
cdm.chkWriter.Reset(newFile)
} else {
cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize)
}
cdm.closers[cdm.curFileSequence] = mmapFile
cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())}
cdm.readPathMtx.Unlock()
cdm.curFileMaxt = 0
return nil
}
// finalizeCurFile writes all pending data to the current tail file,
// truncates its size, and closes it.
func (cdm *OldChunkDiskMapper) finalizeCurFile() error {
if cdm.curFile == nil {
return nil
}
if err := cdm.flushBuffer(); err != nil {
return err
}
if err := cdm.curFile.Sync(); err != nil {
return err
}
return cdm.curFile.Close()
}
func (cdm *OldChunkDiskMapper) write(b []byte) error {
n, err := cdm.chkWriter.Write(b)
cdm.curFileNumBytes.Add(int64(n))
return err
}
func (cdm *OldChunkDiskMapper) writeAndAppendToCRC32(b []byte) error {
if err := cdm.write(b); err != nil {
return err
}
_, err := cdm.crc32.Write(b)
return err
}
func (cdm *OldChunkDiskMapper) writeCRC32() error {
return cdm.write(cdm.crc32.Sum(cdm.byteBuf[:0]))
}
// flushBuffer flushes the current in-memory chunks.
// Assumes that writePathMtx is _write_ locked before calling this method.
func (cdm *OldChunkDiskMapper) flushBuffer() error {
if err := cdm.chkWriter.Flush(); err != nil {
return err
}
cdm.chunkBuffer.clear()
return nil
}
// Chunk returns a chunk from a given reference.
func (cdm *OldChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) {
cdm.readPathMtx.RLock()
// We hold this read lock for the entire duration because if Close()
// is called, the data in the byte slice will get corrupted as the mmapped
// file will be closed.
defer cdm.readPathMtx.RUnlock()
if cdm.closed {
return nil, ErrChunkDiskMapperClosed
}
sgmIndex, chkStart := ref.Unpack()
// We skip the series ref and the mint/maxt beforehand.
chkStart += SeriesRefSize + (2 * MintMaxtSize)
chkCRC32 := newCRC32()
// If it is the current open file, then the chunks can be in the buffer too.
if sgmIndex == cdm.curFileSequence {
chunk := cdm.chunkBuffer.get(ref)
if chunk != nil {
return chunk, nil
}
}
mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex]
if !ok {
if sgmIndex > cdm.curFileSequence {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: -1,
Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex),
}
}
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.New("head chunk file index %d does not exist on disk"),
}
}
if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()),
}
}
// Encoding.
chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0]
// Data length.
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
chkDataLenStart := chkStart + ChunkEncodingSize
c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize)
chkDataLen, n := binary.Uvarint(c)
if n <= 0 {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("reading chunk length failed with %d", n),
}
}
// Verify the chunk data end.
chkDataEnd := chkDataLenStart + n + int(chkDataLen)
if chkDataEnd > mmapFile.byteSlice.Len() {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()),
}
}
// Check the CRC.
sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: err,
}
}
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
}
}
// The chunk data itself.
chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd)
// Make a copy of the chunk data to prevent a panic occurring because the returned
// chunk data slice references an mmap-ed file which could be closed after the
// function returns but while the chunk is still in use.
chkDataCopy := make([]byte, len(chkData))
copy(chkDataCopy, chkData)
chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkDataCopy)
if err != nil {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: err,
}
}
return chk, nil
}
// IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it
// and runs the provided function with information about each chunk. It returns on the first error encountered.
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
defer func() {
cdm.fileMaxtSet = true
}()
chkCRC32 := newCRC32()
// Iterate files in ascending order.
segIDs := make([]int, 0, len(cdm.mmappedChunkFiles))
for seg := range cdm.mmappedChunkFiles {
segIDs = append(segIDs, seg)
}
sort.Ints(segIDs)
for _, segID := range segIDs {
mmapFile := cdm.mmappedChunkFiles[segID]
fileEnd := mmapFile.byteSlice.Len()
if segID == cdm.curFileSequence {
fileEnd = int(cdm.curFileSize())
}
idx := HeadChunkFileHeaderSize
for idx < fileEnd {
if fileEnd-idx < MaxHeadChunkMetaSize {
// Check for all 0s which marks the end of the file.
allZeros := true
for _, b := range mmapFile.byteSlice.Range(idx, fileEnd) {
if b != byte(0) {
allZeros = false
break
}
}
if allZeros {
// End of segment chunk file content.
break
}
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+
" - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID),
}
}
chkCRC32.Reset()
chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx))
startIdx := idx
seriesRef := HeadSeriesRef(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize)))
idx += SeriesRefSize
mint := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize)))
idx += MintMaxtSize
maxt := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize)))
idx += MintMaxtSize
// We preallocate file to help with m-mapping (especially windows systems).
// As series ref always starts from 1, we assume it being 0 to be the end of the actual file data.
// We are not considering possible file corruption that can cause it to be 0.
// Additionally we are checking mint and maxt just to be sure.
if seriesRef == 0 && mint == 0 && maxt == 0 {
break
}
// Encoding.
chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0])
idx += ChunkEncodingSize
dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize))
idx += n
numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2))
idx += int(dataLen) // Skip the data.
// In the beginning we only checked for the chunk meta size.
// Now that we have added the chunk data length, we check for sufficient bytes again.
if idx+CRCSize > fileEnd {
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID),
}
}
// Check CRC.
sum := mmapFile.byteSlice.Range(idx, idx+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil {
return err
}
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
}
}
idx += CRCSize
if maxt > mmapFile.maxt {
mmapFile.maxt = maxt
}
if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil {
if cerr, ok := err.(*CorruptionErr); ok {
cerr.Dir = cdm.dir.Name()
cerr.FileIndex = segID
return cerr
}
return err
}
}
if idx > fileEnd {
// It should be equal to the slice length.
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID),
}
}
}
return nil
}
// Truncate deletes the head chunk files which are strictly below the mint.
// mint should be in milliseconds.
func (cdm *OldChunkDiskMapper) Truncate(mint int64) error {
if !cdm.fileMaxtSet {
return errors.New("maxt of the files are not set")
}
cdm.readPathMtx.RLock()
// Sort the file indices, else if files deletion fails in between,
// it can lead to unsequential files as the map is not sorted.
chkFileIndices := make([]int, 0, len(cdm.mmappedChunkFiles))
for seq := range cdm.mmappedChunkFiles {
chkFileIndices = append(chkFileIndices, seq)
}
sort.Ints(chkFileIndices)
var removedFiles []int
for _, seq := range chkFileIndices {
if seq == cdm.curFileSequence || cdm.mmappedChunkFiles[seq].maxt >= mint {
break
}
if cdm.mmappedChunkFiles[seq].maxt < mint {
removedFiles = append(removedFiles, seq)
}
}
cdm.readPathMtx.RUnlock()
errs := tsdb_errors.NewMulti()
// Cut a new file only if the current file has some chunks.
if cdm.curFileSize() > HeadChunkFileHeaderSize {
errs.Add(cdm.CutNewFile())
}
errs.Add(cdm.deleteFiles(removedFiles))
return errs.Err()
}
func (cdm *OldChunkDiskMapper) deleteFiles(removedFiles []int) error {
cdm.readPathMtx.Lock()
for _, seq := range removedFiles {
if err := cdm.closers[seq].Close(); err != nil {
cdm.readPathMtx.Unlock()
return err
}
delete(cdm.mmappedChunkFiles, seq)
delete(cdm.closers, seq)
}
cdm.readPathMtx.Unlock()
// We actually delete the files separately to not block the readPathMtx for long.
for _, seq := range removedFiles {
if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil {
return err
}
}
return nil
}
// DeleteCorrupted deletes all the head chunk files after the one which had the corruption
// (including the corrupt file).
func (cdm *OldChunkDiskMapper) DeleteCorrupted(originalErr error) error {
err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped.
cerr, ok := err.(*CorruptionErr)
if !ok {
return errors.Wrap(originalErr, "cannot handle error")
}
// Delete all the head chunk files following the corrupt head chunk file.
segs := []int{}
cdm.readPathMtx.RLock()
for seg := range cdm.mmappedChunkFiles {
if seg >= cerr.FileIndex {
segs = append(segs, seg)
}
}
cdm.readPathMtx.RUnlock()
return cdm.deleteFiles(segs)
}
// Size returns the size of the chunk files.
func (cdm *OldChunkDiskMapper) Size() (int64, error) {
return fileutil.DirSize(cdm.dir.Name())
}
func (cdm *OldChunkDiskMapper) curFileSize() int64 {
return cdm.curFileNumBytes.Load()
}
// Close closes all the open files in ChunkDiskMapper.
// It is not longer safe to access chunks from this struct after calling Close.
func (cdm *OldChunkDiskMapper) Close() error {
// 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file.
// The lock order should not be reversed here else it can cause deadlocks.
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
cdm.readPathMtx.Lock()
defer cdm.readPathMtx.Unlock()
if cdm.closed {
return nil
}
cdm.closed = true
errs := tsdb_errors.NewMulti(
closeAllFromMap(cdm.closers),
cdm.finalizeCurFile(),
cdm.dir.Close(),
)
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
cdm.closers = map[int]io.Closer{}
return errs.Err()
}
func (cdm *OldChunkDiskMapper) IsQueueEmpty() bool {
return true // there is no queue
}

View file

@ -1,487 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"encoding/binary"
"errors"
"io/ioutil"
"math/rand"
"os"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
expectedBytes := []byte{}
nextChunkOffset := uint64(HeadChunkFileHeaderSize)
chkCRC32 := newCRC32()
type expectedDataType struct {
seriesRef HeadSeriesRef
chunkRef ChunkDiskMapperRef
mint, maxt int64
numSamples uint16
chunk chunkenc.Chunk
}
expectedData := []expectedDataType{}
var buf [MaxHeadChunkMetaSize]byte
totalChunks := 0
var firstFileName string
for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 {
addChunks := func(numChunks int) {
for i := 0; i < numChunks; i++ {
seriesRef, chkRef, mint, maxt, chunk := createChunkForOld(t, totalChunks, hrw)
totalChunks++
expectedData = append(expectedData, expectedDataType{
seriesRef: seriesRef,
mint: mint,
maxt: maxt,
chunkRef: chkRef,
chunk: chunk,
numSamples: uint16(chunk.NumSamples()),
})
if hrw.curFileSequence != 1 {
// We are checking for bytes written only for the first file.
continue
}
// Calculating expected bytes written on disk for first file.
firstFileName = hrw.curFile.Name()
require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef)
bytesWritten := 0
chkCRC32.Reset()
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef))
bytesWritten += SeriesRefSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint))
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
buf[bytesWritten] = byte(chunk.Encoding())
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
bytesWritten += n
expectedBytes = append(expectedBytes, buf[:bytesWritten]...)
_, err := chkCRC32.Write(buf[:bytesWritten])
require.NoError(t, err)
expectedBytes = append(expectedBytes, chunk.Bytes()...)
_, err = chkCRC32.Write(chunk.Bytes())
require.NoError(t, err)
expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...)
// += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC.
nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize
}
}
addChunks(100)
hrw.CutNewFile()
addChunks(10) // For chunks in in-memory buffer.
}
// Checking on-disk bytes for the first file.
require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles))
require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers))
actualBytes, err := ioutil.ReadFile(firstFileName)
require.NoError(t, err)
// Check header of the segment file.
require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize])))
require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize]))
// Remaining chunk data.
fileEnd := HeadChunkFileHeaderSize + len(expectedBytes)
require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd])
// Testing reading of chunks.
for _, exp := range expectedData {
actChunk, err := hrw.Chunk(exp.chunkRef)
require.NoError(t, err)
require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes())
}
// Testing IterateAllChunks method.
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
idx := 0
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error {
t.Helper()
expData := expectedData[idx]
require.Equal(t, expData.seriesRef, seriesRef)
require.Equal(t, expData.chunkRef, chunkRef)
require.Equal(t, expData.maxt, maxt)
require.Equal(t, expData.maxt, maxt)
require.Equal(t, expData.numSamples, numSamples)
actChunk, err := hrw.Chunk(expData.chunkRef)
require.NoError(t, err)
require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes())
idx++
return nil
}))
require.Equal(t, len(expectedData), idx)
}
func TestOldChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, nil, hrw)
// Checking on-disk bytes for the first file.
require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles))
require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers))
// Testing IterateAllChunks method.
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error {
t.Helper()
require.Equal(t, ucSeriesRef, seriesRef)
require.Equal(t, ucChkRef, chunkRef)
require.Equal(t, ucMint, mint)
require.Equal(t, ucMaxt, maxt)
require.Equal(t, uchunk.Encoding(), encoding) // Asserts that the encoding is EncUnsupportedXOR
actChunk, err := hrw.Chunk(chunkRef)
// The chunk encoding is unknown so Chunk() should fail but us the caller
// are ok with that. Above we asserted that the encoding we expected was
// EncUnsupportedXOR
require.NotNil(t, err)
require.Contains(t, err.Error(), "invalid chunk encoding \"<unknown>\"")
require.Nil(t, actChunk)
return nil
}))
}
// TestOldChunkDiskMapper_Truncate tests
// * If truncation is happening properly based on the time passed.
// * The active file is not deleted even if the passed time makes it eligible to be deleted.
// * Empty current file does not lead to creation of another file after truncation.
// * Non-empty current file leads to creation of another file after truncation.
func TestOldChunkDiskMapper_Truncate(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
fileTimeStep := 100
var thirdFileMinT, sixthFileMinT int64
addChunk := func() int {
mint := timeRange + 1 // Just after the new file cut.
maxt := timeRange + fileTimeStep - 1 // Just before the next file.
// Write a chunks to set maxt for the segment.
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
})
timeRange += fileTimeStep
return mint
}
verifyFiles := func(remainingFiles []int) {
t.Helper()
files, err := ioutil.ReadDir(hrw.dir.Name())
require.NoError(t, err)
require.Equal(t, len(remainingFiles), len(files), "files on disk")
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
for _, i := range remainingFiles {
_, ok := hrw.mmappedChunkFiles[i]
require.Equal(t, true, ok)
}
}
// Create segments 1 to 7.
for i := 1; i <= 7; i++ {
require.NoError(t, hrw.CutNewFile())
mint := int64(addChunk())
if i == 3 {
thirdFileMinT = mint
} else if i == 6 {
sixthFileMinT = mint
}
}
verifyFiles([]int{1, 2, 3, 4, 5, 6, 7})
// Truncating files.
require.NoError(t, hrw.Truncate(thirdFileMinT))
verifyFiles([]int{3, 4, 5, 6, 7, 8})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarted.
var err error
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
return nil
}))
require.True(t, hrw.fileMaxtSet)
verifyFiles([]int{3, 4, 5, 6, 7, 8})
// New file is created after restart even if last file was empty.
addChunk()
verifyFiles([]int{3, 4, 5, 6, 7, 8, 9})
// Truncating files after restart.
require.NoError(t, hrw.Truncate(sixthFileMinT))
verifyFiles([]int{6, 7, 8, 9, 10})
// As the last file was empty, this creates no new files.
require.NoError(t, hrw.Truncate(sixthFileMinT+1))
verifyFiles([]int{6, 7, 8, 9, 10})
addChunk()
// Truncating till current time should not delete the current active file.
require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep))))
verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created.
}
// TestOldChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke
// holes into the file sequence, even if there are empty files in between non-empty files.
// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation
// simply deleted all empty files instead of stopping once it encountered a non-empty file.
func TestOldChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
addChunk := func() {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
})
timeRange += step
}
emptyFile := func() {
require.NoError(t, hrw.CutNewFile())
}
nonEmptyFile := func() {
emptyFile()
addChunk()
}
addChunk() // 1. Created with the first chunk.
nonEmptyFile() // 2.
nonEmptyFile() // 3.
emptyFile() // 4.
nonEmptyFile() // 5.
emptyFile() // 6.
verifyFiles := func(remainingFiles []int) {
t.Helper()
files, err := ioutil.ReadDir(hrw.dir.Name())
require.NoError(t, err)
require.Equal(t, len(remainingFiles), len(files), "files on disk")
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
for _, i := range remainingFiles {
_, ok := hrw.mmappedChunkFiles[i]
require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i)
}
}
verifyFiles([]int{1, 2, 3, 4, 5, 6})
// Truncating files till 2. It should not delete anything after 3 (inclusive)
// though files 4 and 6 are empty.
file2Maxt := hrw.mmappedChunkFiles[2].maxt
require.NoError(t, hrw.Truncate(file2Maxt+1))
// As 6 was empty, it should not create another file.
verifyFiles([]int{3, 4, 5, 6})
addChunk()
// Truncate creates another file as 6 is not empty now.
require.NoError(t, hrw.Truncate(file2Maxt+1))
verifyFiles([]int{3, 4, 5, 6, 7})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting checks for unsequential files.
var err error
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
verifyFiles([]int{3, 4, 5, 6, 7})
}
// TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks tests for
// https://github.com/prometheus/prometheus/issues/7753
func TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
// Write a chunks to iterate on it later.
_ = hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(err error) {
require.NoError(t, err)
})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
// Forcefully failing IterateAllChunks.
require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
return errors.New("random error")
}))
// Truncation call should not return error after IterateAllChunks fails.
require.NoError(t, hrw.Truncate(2000))
}
func TestOldChunkDiskMapper_ReadRepairOnEmptyLastFile(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
addChunk := func() {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
})
timeRange += step
}
nonEmptyFile := func() {
require.NoError(t, hrw.CutNewFile())
addChunk()
}
addChunk() // 1. Created with the first chunk.
nonEmptyFile() // 2.
nonEmptyFile() // 3.
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
lastFile := 0
for idx := range hrw.mmappedChunkFiles {
if idx > lastFile {
lastFile = idx
}
}
require.Equal(t, 3, lastFile)
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Write an empty last file mimicking an abrupt shutdown on file creation.
emptyFileName := segmentFile(dir, lastFile+1)
f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0o666)
require.NoError(t, err)
require.NoError(t, f.Sync())
stat, err := f.Stat()
require.NoError(t, err)
require.Equal(t, int64(0), stat.Size())
require.NoError(t, f.Close())
// Open chunk disk mapper again, corrupt file should be removed.
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
return nil
}))
require.True(t, hrw.fileMaxtSet)
// Removed from memory.
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
for idx := range hrw.mmappedChunkFiles {
require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file")
}
// Removed even from disk.
files, err := ioutil.ReadDir(dir)
require.NoError(t, err)
require.Equal(t, 3, len(files))
for _, fi := range files {
seq, err := strconv.ParseUint(fi.Name(), 10, 64)
require.NoError(t, err)
require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file")
}
}
func testOldChunkDiskMapper(t *testing.T) *OldChunkDiskMapper {
tmpdir, err := ioutil.TempDir("", "data")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpdir))
})
hrw, err := NewOldChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
return nil
}))
require.True(t, hrw.fileMaxtSet)
return hrw
}
func createChunkForOld(t *testing.T, idx int, hrw *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) {
seriesRef = HeadSeriesRef(rand.Int63())
mint = int64((idx)*1000 + 1)
maxt = int64((idx + 1) * 1000)
chunk = randomChunk(t)
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(err error) {
require.NoError(t, err)
})
return
}

View file

@ -57,20 +57,6 @@ var (
defaultIsolationDisabled = false defaultIsolationDisabled = false
) )
// chunkDiskMapper is a temporary interface while we transition from
// 0 size queue to queue based chunk disk mapper.
type chunkDiskMapper interface {
CutNewFile() (returnErr error)
IterateAllChunks(f func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error)
Truncate(mint int64) error
DeleteCorrupted(originalErr error) error
Size() (int64, error)
Close() error
Chunk(ref chunks.ChunkDiskMapperRef) (chunkenc.Chunk, error)
WriteChunk(seriesRef chunks.HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef chunks.ChunkDiskMapperRef)
IsQueueEmpty() bool
}
// Head handles reads and writes of time series data within a time window. // Head handles reads and writes of time series data within a time window.
type Head struct { type Head struct {
chunkRange atomic.Int64 chunkRange atomic.Int64
@ -111,7 +97,7 @@ type Head struct {
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching.
// chunkDiskMapper is used to write and read Head chunks to/from disk. // chunkDiskMapper is used to write and read Head chunks to/from disk.
chunkDiskMapper chunkDiskMapper chunkDiskMapper *chunks.ChunkDiskMapper
chunkSnapshotMtx sync.Mutex chunkSnapshotMtx sync.Mutex
@ -229,21 +215,13 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
opts.ChunkPool = chunkenc.NewPool() opts.ChunkPool = chunkenc.NewPool()
} }
if opts.ChunkWriteQueueSize > 0 { h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( r,
r, mmappedChunksDir(opts.ChunkDirRoot),
mmappedChunksDir(opts.ChunkDirRoot), opts.ChunkPool,
opts.ChunkPool, opts.ChunkWriteBufferSize,
opts.ChunkWriteBufferSize, opts.ChunkWriteQueueSize,
opts.ChunkWriteQueueSize, )
)
} else {
h.chunkDiskMapper, err = chunks.NewOldChunkDiskMapper(
mmappedChunksDir(opts.ChunkDirRoot),
opts.ChunkPool,
opts.ChunkWriteBufferSize,
)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -489,7 +489,7 @@ func (a *headAppender) Commit() (err error) {
// the appendID for isolation. (The appendID can be zero, which results in no // the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.) // isolation for this append.)
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper chunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) { func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) {
// Based on Gorilla white papers this offers near-optimal compression ratio // Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases // so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples. // the time range within which we have to decompress all samples.
@ -587,7 +587,7 @@ func addJitterToChunkEndTime(seriesHash uint64, chunkMinTime, nextAt, maxNextAt
return min(maxNextAt, nextAt+chunkDurationVariance-(chunkDurationMaxVariance/2)) return min(maxNextAt, nextAt+chunkDurationVariance-(chunkDurationMaxVariance/2))
} }
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper chunkDiskMapper) *memChunk { func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
s.mmapCurrentHeadChunk(chunkDiskMapper) s.mmapCurrentHeadChunk(chunkDiskMapper)
s.headChunk = &memChunk{ s.headChunk = &memChunk{
@ -608,7 +608,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper chunkDiskMapper)
return s.headChunk return s.headChunk
} }
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper chunkDiskMapper) { func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) {
if s.headChunk == nil { if s.headChunk == nil {
// There is no head chunk, so nothing to m-map here. // There is no head chunk, so nothing to m-map here.
return return

View file

@ -329,7 +329,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk. // chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk.
// If garbageCollect is true, it means that the returned *memChunk // If garbageCollect is true, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
func (s *memSeries) chunk(id chunks.HeadChunkID, cdm chunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
@ -344,7 +344,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, cdm chunkDiskMapper) (chunk *me
} }
return s.headChunk, false, nil return s.headChunk, false, nil
} }
chk, err := cdm.Chunk(s.mmappedChunks[ix].ref) chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
if err != nil { if err != nil {
if _, ok := err.(*chunks.CorruptionErr); ok { if _, ok := err.(*chunks.CorruptionErr); ok {
panic(err) panic(err)
@ -363,7 +363,7 @@ type safeChunk struct {
s *memSeries s *memSeries
cid chunks.HeadChunkID cid chunks.HeadChunkID
isoState *isolationState isoState *isolationState
chunkDiskMapper chunkDiskMapper chunkDiskMapper *chunks.ChunkDiskMapper
} }
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
@ -375,8 +375,8 @@ func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range. // iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock. // It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, cdm chunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect, err := s.chunk(id, cdm) c, garbageCollect, err := s.chunk(id, chunkDiskMapper)
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got // series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any // accessed. We must ensure to not garbage collect as long as any

View file

@ -1634,7 +1634,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
_, ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) _, ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunk was created") require.False(t, chunkCreated, "chunk was created")
require.NoError(t, h.chunkDiskMapper.CutNewFile()) h.chunkDiskMapper.CutNewFile()
} }
require.NoError(t, h.Close()) require.NoError(t, h.Close())