diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 1443e0bd4..f4ac61fcc 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -70,6 +70,22 @@ const ( DefaultWriteBufferSize = 4 * 1024 * 1024 // 4 MiB. ) +// ChunkDiskMapperRef represents the location of a head chunk on disk. +// The upper 4 bytes hold the index of the head chunk file and +// the lower 4 bytes hold the byte offset in the head chunk file where the chunk starts. +type ChunkDiskMapperRef uint64 + +func newChunkDiskMapperRef(seq, offset uint64) ChunkDiskMapperRef { + return ChunkDiskMapperRef((seq << 32) | offset) +} + +func (ref ChunkDiskMapperRef) Unpack() (sgmIndex, chkStart int) { + sgmIndex = int(ref >> 32) + chkStart = int((ref << 32) >> 32) + return sgmIndex, chkStart + +} + // CorruptionErr is an error that's returned when corruption is encountered. type CorruptionErr struct { Dir string @@ -272,7 +288,7 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro // WriteChunk writes the chunk to the disk. // The returned chunk ref is the reference from where the chunk encoding starts for the chunk. -func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk chunkenc.Chunk) (chkRef uint64, err error) { +func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk chunkenc.Chunk) (chkRef ChunkDiskMapperRef, err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() @@ -297,9 +313,7 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c cdm.crc32.Reset() bytesWritten := 0 - // The upper 4 bytes are for the head chunk file index and - // the lower 4 bytes are for the head chunk file offset where to start reading this chunk. - chkRef = chunkRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize())) + chkRef = newChunkDiskMapperRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize())) binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], seriesRef) bytesWritten += SeriesRefSize @@ -339,10 +353,6 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c return chkRef, nil } -func chunkRef(seq, offset uint64) (chunkRef uint64) { - return (seq << 32) | offset -} - // shouldCutNewFile decides the cutting of a new file based on time and size retention. // Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. // Time retention: so that we can delete old chunks with some time guarantee in low load environments. @@ -456,28 +466,22 @@ func (cdm *ChunkDiskMapper) flushBuffer() error { } // Chunk returns a chunk from a given reference. -func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) { +func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) { cdm.readPathMtx.RLock() // We hold this read lock for the entire duration because if the Close() // is called, the data in the byte slice will get corrupted as the mmapped // file will be closed. defer cdm.readPathMtx.RUnlock() - var ( - // Get the upper 4 bytes. - // These contain the head chunk file index. - sgmIndex = int(ref >> 32) - // Get the lower 4 bytes. - // These contain the head chunk file offset where the chunk starts. - // We skip the series ref and the mint/maxt beforehand. - chkStart = int((ref<<32)>>32) + SeriesRefSize + (2 * MintMaxtSize) - chkCRC32 = newCRC32() - ) - if cdm.closed { return nil, ErrChunkDiskMapperClosed } + sgmIndex, chkStart := ref.Unpack() + // We skip the series ref and the mint/maxt beforehand. + chkStart += SeriesRefSize + (2 * MintMaxtSize) + chkCRC32 := newCRC32() + // If it is the current open file, then the chunks can be in the buffer too. if sgmIndex == cdm.curFileSequence { chunk := cdm.chunkBuffer.get(ref) @@ -578,7 +582,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) { // and runs the provided function on each chunk. It returns on the first error encountered. // NOTE: This method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. -func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error) (err error) { +func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef uint64, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() @@ -623,7 +627,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, } } chkCRC32.Reset() - chunkRef := chunkRef(uint64(segID), uint64(idx)) + chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx)) startIdx := idx seriesRef := binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize)) @@ -826,19 +830,19 @@ const inBufferShards = 128 // 128 is a randomly chosen number. // chunkBuffer is a thread safe buffer for chunks. type chunkBuffer struct { - inBufferChunks [inBufferShards]map[uint64]chunkenc.Chunk + inBufferChunks [inBufferShards]map[ChunkDiskMapperRef]chunkenc.Chunk inBufferChunksMtxs [inBufferShards]sync.RWMutex } func newChunkBuffer() *chunkBuffer { cb := &chunkBuffer{} for i := 0; i < inBufferShards; i++ { - cb.inBufferChunks[i] = make(map[uint64]chunkenc.Chunk) + cb.inBufferChunks[i] = make(map[ChunkDiskMapperRef]chunkenc.Chunk) } return cb } -func (cb *chunkBuffer) put(ref uint64, chk chunkenc.Chunk) { +func (cb *chunkBuffer) put(ref ChunkDiskMapperRef, chk chunkenc.Chunk) { shardIdx := ref % inBufferShards cb.inBufferChunksMtxs[shardIdx].Lock() @@ -846,7 +850,7 @@ func (cb *chunkBuffer) put(ref uint64, chk chunkenc.Chunk) { cb.inBufferChunksMtxs[shardIdx].Unlock() } -func (cb *chunkBuffer) get(ref uint64) chunkenc.Chunk { +func (cb *chunkBuffer) get(ref ChunkDiskMapperRef) chunkenc.Chunk { shardIdx := ref % inBufferShards cb.inBufferChunksMtxs[shardIdx].RLock() @@ -858,7 +862,7 @@ func (cb *chunkBuffer) get(ref uint64) chunkenc.Chunk { func (cb *chunkBuffer) clear() { for i := 0; i < inBufferShards; i++ { cb.inBufferChunksMtxs[i].Lock() - cb.inBufferChunks[i] = make(map[uint64]chunkenc.Chunk) + cb.inBufferChunks[i] = make(map[ChunkDiskMapperRef]chunkenc.Chunk) cb.inBufferChunksMtxs[i].Unlock() } } diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 351943900..f1aa13cec 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -38,10 +38,11 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { chkCRC32 := newCRC32() type expectedDataType struct { - seriesRef, chunkRef uint64 - mint, maxt int64 - numSamples uint16 - chunk chunkenc.Chunk + seriesRef uint64 + chunkRef ChunkDiskMapperRef + mint, maxt int64 + numSamples uint16 + chunk chunkenc.Chunk } expectedData := []expectedDataType{} @@ -69,7 +70,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { // Calculating expected bytes written on disk for first file. firstFileName = hrw.curFile.Name() - require.Equal(t, chunkRef(1, nextChunkOffset), chkRef) + require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef) bytesWritten := 0 chkCRC32.Reset() @@ -132,7 +133,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { require.NoError(t, err) idx := 0 - require.NoError(t, hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { + require.NoError(t, hrw.IterateAllChunks(func(seriesRef uint64, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error { t.Helper() expData := expectedData[idx] @@ -220,7 +221,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ uint64, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) require.True(t, hrw.fileMaxtSet) verifyFiles([]int{3, 4, 5, 6, 7, 8}) @@ -334,7 +335,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { require.NoError(t, err) // Forcefully failing IterateAllChunks. - require.Error(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { + require.Error(t, hrw.IterateAllChunks(func(_ uint64, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return errors.New("random error") })) @@ -390,7 +391,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ uint64, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) require.True(t, hrw.fileMaxtSet) // Removed from memory. @@ -421,7 +422,7 @@ func testChunkDiskMapper(t *testing.T) *ChunkDiskMapper { hrw, err := NewChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ uint64, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) require.True(t, hrw.fileMaxtSet) return hrw } @@ -437,7 +438,7 @@ func randomChunk(t *testing.T) chunkenc.Chunk { return chunk } -func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef uint64, chunkRef uint64, mint, maxt int64, chunk chunkenc.Chunk) { +func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef uint64, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { var err error seriesRef = uint64(rand.Int63()) mint = int64((idx)*1000 + 1) diff --git a/tsdb/head.go b/tsdb/head.go index cd0f9bd16..80edf794e 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -605,7 +605,7 @@ func (h *Head) Init(minValidTime int64) error { func (h *Head) loadMmappedChunks(refSeries map[uint64]*memSeries) (map[uint64][]*mmappedChunk, error) { mmappedChunks := map[uint64][]*mmappedChunk{} - if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { + if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef uint64, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error { if maxt < h.minValidTime.Load() { return nil } @@ -1563,8 +1563,9 @@ func overlapsClosedInterval(mint1, maxt1, mint2, maxt2 int64) bool { return mint1 <= maxt2 && mint2 <= maxt1 } +// mappedChunks describes chunk data on disk that can be mmapped type mmappedChunk struct { - ref uint64 + ref chunks.ChunkDiskMapperRef numSamples uint16 minTime, maxTime int64 } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index e404e94f6..6043927ea 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -63,7 +63,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. h, err := NewHead(nil, nil, wlog, opts, nil) require.NoError(t, err) - require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ uint64, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir))