From 067efc3725b2f2f5cf7a2982a8b7b04ddd040b39 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 17 Nov 2021 08:05:10 -0500 Subject: [PATCH] clarify HeadChunkID type and usage (#9726) Signed-off-by: Dieter Plaetinck Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> --- tsdb/chunks/chunks.go | 23 +++++++++++++++++++---- tsdb/docs/refs.md | 14 +++++++++----- tsdb/head.go | 29 ++++++++++++++++++++--------- tsdb/head_read.go | 31 +++++++++++++++---------------- tsdb/head_test.go | 8 ++++---- 5 files changed, 67 insertions(+), 38 deletions(-) diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 1d18d53f2..2f565fcb1 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -66,20 +66,35 @@ type HeadSeriesRef uint64 // The HeadSeriesRef and ChunkID may not exceed 5 and 3 bytes respectively. type HeadChunkRef uint64 -func NewHeadChunkRef(hsr HeadSeriesRef, chunkID uint64) HeadChunkRef { +func NewHeadChunkRef(hsr HeadSeriesRef, chunkID HeadChunkID) HeadChunkRef { if hsr > (1<<40)-1 { panic("series ID exceeds 5 bytes") } if chunkID > (1<<24)-1 { panic("chunk ID exceeds 3 bytes") } - return HeadChunkRef(uint64(hsr<<24) | chunkID) + return HeadChunkRef(uint64(hsr<<24) | uint64(chunkID)) } -func (p HeadChunkRef) Unpack() (HeadSeriesRef, uint64) { - return HeadSeriesRef(p >> 24), uint64(p<<40) >> 40 +func (p HeadChunkRef) Unpack() (HeadSeriesRef, HeadChunkID) { + return HeadSeriesRef(p >> 24), HeadChunkID(p<<40) >> 40 } +// HeadChunkID refers to a specific chunk in a series (memSeries) in the Head. +// Each memSeries has its own monotonically increasing number to refer to its chunks. +// If the HeadChunkID value is... +// * memSeries.firstChunkID+len(memSeries.mmappedChunks), it's the head chunk. +// * less than the above, but >= memSeries.firstID, then it's +// memSeries.mmappedChunks[i] where i = HeadChunkID - memSeries.firstID. +// Example: +// assume a memSeries.firstChunkID=7 and memSeries.mmappedChunks=[p5,p6,p7,p8,p9]. +// | HeadChunkID value | refers to ... | +// |-------------------|----------------------------------------------------------------------------------------| +// | 0-6 | chunks that have been compacted to blocks, these won't return data for queries in Head | +// | 7-11 | memSeries.mmappedChunks[i] where i is 0 to 4. | +// | 12 | memSeries.headChunk | +type HeadChunkID uint64 + // BlockChunkRef refers to a chunk within a persisted block. // The upper 4 bytes are for the segment index and // the lower 4 bytes are for the segment offset where the data starts for this chunk. diff --git a/tsdb/docs/refs.md b/tsdb/docs/refs.md index 4f745b572..b070b5c7a 100644 --- a/tsdb/docs/refs.md +++ b/tsdb/docs/refs.md @@ -1,9 +1,5 @@ # An overview of different Series and Chunk reference types -## Used internally in TSDB - -* `ChunkDiskMapperRef`: to load mmapped chunks from disk. - ## Used by callers of TSDB | Location | Series access | Chunk access | @@ -60,7 +56,7 @@ Note: we cover the implementations as used in Prometheus. Other projects may us A `HeadChunkRef` is an 8 byte integer that packs together: * 5 Bytes for `HeadSeriesRef`. -* 3 Bytes for `ChunkID` (uint64). This is simply an index into a slice of mmappedChunks for a given series +* 3 Bytes for `HeadChunkID` (uint64) (see below). There are two implications here: @@ -85,3 +81,11 @@ The `ChunkRef` types allow retrieving the chunk data as efficiently as possible. Hence we need to pack the `HeadSeriesRef` to get to the series. * In persistent blocks, the chunk files are separated from the index and static. Hence you only need the co-ordinates within the `chunks` directory to get to the chunk. Hence no need of `BlockSeriesRef`. + +## Used internally in TSDB + +* [`HeadChunkID`](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb/chunks#HeadChunkID) references a chunk of a `memSeries` (either an `mmappedChunk` or `headChunk`). + If a caller has, for whatever reason, an "old" `HeadChunkID` that refers to a chunk that has been compacted into a block, querying the memSeries for it will not return any data. +* [`ChunkDiskMapperRef`](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb/chunks#ChunkDiskMapperRef) is an 8 Byte integer. + 4 Bytes are used to refer to a chunks file number and 4 bytes serve as byte offset (similar to `BlockChunkRef`). `mmappedChunk` provide this value such that callers can load the mmapped chunk from disk. + diff --git a/tsdb/head.go b/tsdb/head.go index 4f34fd2d5..b4806addc 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1469,13 +1469,24 @@ func (s sample) V() float64 { return s.v } type memSeries struct { sync.RWMutex - ref chunks.HeadSeriesRef - lset labels.Labels - mmappedChunks []*mmappedChunk // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps. - mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. - headChunk *memChunk // Most recent chunk in memory that's still being built. - chunkRange int64 - firstChunkID int + ref chunks.HeadSeriesRef + lset labels.Labels + + // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps. + // When compaction runs, chunks get moved into a block and all pointers are shifted like so: + // + // /------- let's say these 2 chunks get stored into a block + // | | + // before compaction: mmappedChunks=[p5,p6,p7,p8,p9] firstChunkID=5 + // after compaction: mmappedChunks=[p7,p8,p9] firstChunkID=7 + // + // pN is the pointer to the mmappedChunk referered to by HeadChunkID=N + mmappedChunks []*mmappedChunk + + mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. + headChunk *memChunk // Most recent chunk in memory that's still being built. + chunkRange int64 + firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0] nextAt int64 // Timestamp at which to cut the next chunk. @@ -1535,7 +1546,7 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { if s.headChunk != nil && s.headChunk.maxTime < mint { // If head chunk is truncated, we can truncate all mmapped chunks. removed = 1 + len(s.mmappedChunks) - s.firstChunkID += removed + s.firstChunkID += chunks.HeadChunkID(removed) s.headChunk = nil s.mmappedChunks = nil return removed @@ -1548,7 +1559,7 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { removed = i + 1 } s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[removed:]...) - s.firstChunkID += removed + s.firstChunkID += chunks.HeadChunkID(removed) } return removed } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 5e9a13d5d..965db74bf 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -161,24 +161,23 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chk *chks = append(*chks, chunks.Meta{ MinTime: c.minTime, MaxTime: c.maxTime, - Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, uint64(s.chunkID(i)))), + Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))), }) } if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) { *chks = append(*chks, chunks.Meta{ MinTime: s.headChunk.minTime, MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to). - Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, uint64(s.chunkID(len(s.mmappedChunks))))), + Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)))), }) } return nil } -// chunkID returns the ID corresponding to .mmappedChunks[pos] -// (head chunk if pos==len(mmappedChunks)) -func (s *memSeries) chunkID(pos int) int { - return pos + s.firstChunkID +// headChunkID returns the HeadChunkID corresponding to .mmappedChunks[pos] +func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID { + return chunks.HeadChunkID(pos) + s.firstChunkID } // LabelValueFor returns label value for the given label name in the series referred to by ID. @@ -261,7 +260,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) { } s.Lock() - c, garbageCollect, err := s.chunk(int(cid), h.head.chunkDiskMapper) + c, garbageCollect, err := s.chunk(cid, h.head.chunkDiskMapper) if err != nil { s.Unlock() return nil, err @@ -284,21 +283,21 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) { return &safeChunk{ Chunk: c.chunk, s: s, - cid: int(cid), + cid: cid, isoState: h.isoState, chunkDiskMapper: h.head.chunkDiskMapper, }, nil } -// chunk returns the chunk for the chunkID from memory or by m-mapping it from the disk. +// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk. // If garbageCollect is true, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. -func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { +func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix // is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. - ix := id - s.firstChunkID + ix := int(id) - int(s.firstChunkID) if ix < 0 || ix > len(s.mmappedChunks) { return nil, false, storage.ErrNotFound } @@ -325,7 +324,7 @@ func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chun type safeChunk struct { chunkenc.Chunk s *memSeries - cid int + cid chunks.HeadChunkID isoState *isolationState chunkDiskMapper *chunks.ChunkDiskMapper } @@ -337,9 +336,9 @@ func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { return it } -// iterator returns a chunk iterator for the requested chunkID. +// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range. // It is unsafe to call this concurrently with s.append(...) without holding the series lock. -func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { +func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { c, garbageCollect, err := s.chunk(id, chunkDiskMapper) // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a // series's chunk, which got then garbage collected before it got @@ -357,7 +356,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper * } }() - ix := id - s.firstChunkID + ix := int(id) - int(s.firstChunkID) numSamples := c.chunk.NumSamples() stopAfter := numSamples @@ -404,7 +403,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper * return chunkenc.NewNopIterator() } - if id-s.firstChunkID < len(s.mmappedChunks) { + if int(id)-int(s.firstChunkID) < len(s.mmappedChunks) { if stopAfter == numSamples { return c.chunk.Iterator(it) } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 42f8a1e7b..99a0cdb0d 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -561,7 +561,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { // Check that truncate removes half of the chunks and afterwards // that the ID of the last chunk still gives us the same chunk afterwards. countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk. - lastID := s.chunkID(countBefore - 1) + lastID := s.headChunkID(countBefore - 1) lastChunk, _, err := s.chunk(lastID, chunkDiskMapper) require.NoError(t, err) require.NotNil(t, lastChunk) @@ -582,11 +582,11 @@ func TestMemSeries_truncateChunks(t *testing.T) { // Validate that the series' sample buffer is applied correctly to the last chunk // after truncation. - it1 := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) + it1 := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) _, ok := it1.(*memSafeIterator) require.True(t, ok) - it2 := s.iterator(s.chunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, nil) + it2 := s.iterator(s.headChunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, nil) _, ok = it2.(*memSafeIterator) require.False(t, ok, "non-last chunk incorrectly wrapped with sample buffer") } @@ -2262,7 +2262,7 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { require.True(t, ok, "sample append failed") } - it := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) + it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) _, ok := it.(*memSafeIterator) require.True(t, ok)