clarify HeadChunkID type and usage (#9726)

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>
Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
This commit is contained in:
Dieter Plaetinck 2021-11-17 08:05:10 -05:00 committed by GitHub
parent d140df2335
commit 067efc3725
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 38 deletions

View file

@ -66,20 +66,35 @@ type HeadSeriesRef uint64
// The HeadSeriesRef and ChunkID may not exceed 5 and 3 bytes respectively.
type HeadChunkRef uint64
func NewHeadChunkRef(hsr HeadSeriesRef, chunkID uint64) HeadChunkRef {
func NewHeadChunkRef(hsr HeadSeriesRef, chunkID HeadChunkID) HeadChunkRef {
if hsr > (1<<40)-1 {
panic("series ID exceeds 5 bytes")
}
if chunkID > (1<<24)-1 {
panic("chunk ID exceeds 3 bytes")
}
return HeadChunkRef(uint64(hsr<<24) | chunkID)
return HeadChunkRef(uint64(hsr<<24) | uint64(chunkID))
}
func (p HeadChunkRef) Unpack() (HeadSeriesRef, uint64) {
return HeadSeriesRef(p >> 24), uint64(p<<40) >> 40
func (p HeadChunkRef) Unpack() (HeadSeriesRef, HeadChunkID) {
return HeadSeriesRef(p >> 24), HeadChunkID(p<<40) >> 40
}
// HeadChunkID refers to a specific chunk in a series (memSeries) in the Head.
// Each memSeries has its own monotonically increasing number to refer to its chunks.
// If the HeadChunkID value is...
// * memSeries.firstChunkID+len(memSeries.mmappedChunks), it's the head chunk.
// * less than the above, but >= memSeries.firstID, then it's
// memSeries.mmappedChunks[i] where i = HeadChunkID - memSeries.firstID.
// Example:
// assume a memSeries.firstChunkID=7 and memSeries.mmappedChunks=[p5,p6,p7,p8,p9].
// | HeadChunkID value | refers to ... |
// |-------------------|----------------------------------------------------------------------------------------|
// | 0-6 | chunks that have been compacted to blocks, these won't return data for queries in Head |
// | 7-11 | memSeries.mmappedChunks[i] where i is 0 to 4. |
// | 12 | memSeries.headChunk |
type HeadChunkID uint64
// BlockChunkRef refers to a chunk within a persisted block.
// The upper 4 bytes are for the segment index and
// the lower 4 bytes are for the segment offset where the data starts for this chunk.

View file

@ -1,9 +1,5 @@
# An overview of different Series and Chunk reference types
## Used internally in TSDB
* `ChunkDiskMapperRef`: to load mmapped chunks from disk.
## Used by callers of TSDB
| Location | Series access | Chunk access |
@ -60,7 +56,7 @@ Note: we cover the implementations as used in Prometheus. Other projects may us
A `HeadChunkRef` is an 8 byte integer that packs together:
* 5 Bytes for `HeadSeriesRef`.
* 3 Bytes for `ChunkID` (uint64). This is simply an index into a slice of mmappedChunks for a given series
* 3 Bytes for `HeadChunkID` (uint64) (see below).
There are two implications here:
@ -85,3 +81,11 @@ The `ChunkRef` types allow retrieving the chunk data as efficiently as possible.
Hence we need to pack the `HeadSeriesRef` to get to the series.
* In persistent blocks, the chunk files are separated from the index and static. Hence you only need the co-ordinates within the `chunks` directory
to get to the chunk. Hence no need of `BlockSeriesRef`.
## Used internally in TSDB
* [`HeadChunkID`](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb/chunks#HeadChunkID) references a chunk of a `memSeries` (either an `mmappedChunk` or `headChunk`).
If a caller has, for whatever reason, an "old" `HeadChunkID` that refers to a chunk that has been compacted into a block, querying the memSeries for it will not return any data.
* [`ChunkDiskMapperRef`](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb/chunks#ChunkDiskMapperRef) is an 8 Byte integer.
4 Bytes are used to refer to a chunks file number and 4 bytes serve as byte offset (similar to `BlockChunkRef`). `mmappedChunk` provide this value such that callers can load the mmapped chunk from disk.

View file

@ -1471,11 +1471,22 @@ type memSeries struct {
ref chunks.HeadSeriesRef
lset labels.Labels
mmappedChunks []*mmappedChunk // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
// When compaction runs, chunks get moved into a block and all pointers are shifted like so:
//
// /------- let's say these 2 chunks get stored into a block
// | |
// before compaction: mmappedChunks=[p5,p6,p7,p8,p9] firstChunkID=5
// after compaction: mmappedChunks=[p7,p8,p9] firstChunkID=7
//
// pN is the pointer to the mmappedChunk referered to by HeadChunkID=N
mmappedChunks []*mmappedChunk
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
headChunk *memChunk // Most recent chunk in memory that's still being built.
chunkRange int64
firstChunkID int
firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0]
nextAt int64 // Timestamp at which to cut the next chunk.
@ -1535,7 +1546,7 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
if s.headChunk != nil && s.headChunk.maxTime < mint {
// If head chunk is truncated, we can truncate all mmapped chunks.
removed = 1 + len(s.mmappedChunks)
s.firstChunkID += removed
s.firstChunkID += chunks.HeadChunkID(removed)
s.headChunk = nil
s.mmappedChunks = nil
return removed
@ -1548,7 +1559,7 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
removed = i + 1
}
s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[removed:]...)
s.firstChunkID += removed
s.firstChunkID += chunks.HeadChunkID(removed)
}
return removed
}

View file

@ -161,24 +161,23 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chk
*chks = append(*chks, chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, uint64(s.chunkID(i)))),
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
})
}
if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) {
*chks = append(*chks, chunks.Meta{
MinTime: s.headChunk.minTime,
MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to).
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, uint64(s.chunkID(len(s.mmappedChunks))))),
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)))),
})
}
return nil
}
// chunkID returns the ID corresponding to .mmappedChunks[pos]
// (head chunk if pos==len(mmappedChunks))
func (s *memSeries) chunkID(pos int) int {
return pos + s.firstChunkID
// headChunkID returns the HeadChunkID corresponding to .mmappedChunks[pos]
func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
return chunks.HeadChunkID(pos) + s.firstChunkID
}
// LabelValueFor returns label value for the given label name in the series referred to by ID.
@ -261,7 +260,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
}
s.Lock()
c, garbageCollect, err := s.chunk(int(cid), h.head.chunkDiskMapper)
c, garbageCollect, err := s.chunk(cid, h.head.chunkDiskMapper)
if err != nil {
s.Unlock()
return nil, err
@ -284,21 +283,21 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
return &safeChunk{
Chunk: c.chunk,
s: s,
cid: int(cid),
cid: cid,
isoState: h.isoState,
chunkDiskMapper: h.head.chunkDiskMapper,
}, nil
}
// chunk returns the chunk for the chunkID from memory or by m-mapping it from the disk.
// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk.
// If garbageCollect is true, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
ix := id - s.firstChunkID
ix := int(id) - int(s.firstChunkID)
if ix < 0 || ix > len(s.mmappedChunks) {
return nil, false, storage.ErrNotFound
}
@ -325,7 +324,7 @@ func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chun
type safeChunk struct {
chunkenc.Chunk
s *memSeries
cid int
cid chunks.HeadChunkID
isoState *isolationState
chunkDiskMapper *chunks.ChunkDiskMapper
}
@ -337,9 +336,9 @@ func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
return it
}
// iterator returns a chunk iterator for the requested chunkID.
// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect, err := s.chunk(id, chunkDiskMapper)
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
@ -357,7 +356,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *
}
}()
ix := id - s.firstChunkID
ix := int(id) - int(s.firstChunkID)
numSamples := c.chunk.NumSamples()
stopAfter := numSamples
@ -404,7 +403,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *
return chunkenc.NewNopIterator()
}
if id-s.firstChunkID < len(s.mmappedChunks) {
if int(id)-int(s.firstChunkID) < len(s.mmappedChunks) {
if stopAfter == numSamples {
return c.chunk.Iterator(it)
}

View file

@ -561,7 +561,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
// Check that truncate removes half of the chunks and afterwards
// that the ID of the last chunk still gives us the same chunk afterwards.
countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk.
lastID := s.chunkID(countBefore - 1)
lastID := s.headChunkID(countBefore - 1)
lastChunk, _, err := s.chunk(lastID, chunkDiskMapper)
require.NoError(t, err)
require.NotNil(t, lastChunk)
@ -582,11 +582,11 @@ func TestMemSeries_truncateChunks(t *testing.T) {
// Validate that the series' sample buffer is applied correctly to the last chunk
// after truncation.
it1 := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil)
it1 := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil)
_, ok := it1.(*memSafeIterator)
require.True(t, ok)
it2 := s.iterator(s.chunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, nil)
it2 := s.iterator(s.headChunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, nil)
_, ok = it2.(*memSafeIterator)
require.False(t, ok, "non-last chunk incorrectly wrapped with sample buffer")
}
@ -2262,7 +2262,7 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
require.True(t, ok, "sample append failed")
}
it := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil)
it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil)
_, ok := it.(*memSafeIterator)
require.True(t, ok)