TSDB: streamline reading of overlapping head chunks

`getOOOSeriesChunks` was already finding sets of overlapping chunks; we
store those in a `multiMeta` struct so that `ChunkOrIterable` can
reconstruct an `Iterable` easily and predictably.

We no longer need a `MergeOOO` flag to indicate that this Meta should
be merged with other ones; this is explicit in the `multiMeta` structure.

We also no longer need `chunkMetaAndChunkDiskMapperRef`.

Add `wrapOOOHeadChunk` to defeat `chunkenc.Pool` - chunks are reset
during compaction, but if we wrap them (like `safeHeadChunk` was doing
then this is skipped) .

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-08-23 17:10:02 +01:00
parent 838e49e7b8
commit cde42f30e9
4 changed files with 98 additions and 246 deletions

View file

@ -133,9 +133,6 @@ type Meta struct {
// Time range the data covers.
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
MinTime, MaxTime int64
// Flag to indicate that this meta needs merge with OOO data.
MergeOOO bool
}
// ChunkFromSamples requires all samples to have the same type.

View file

@ -366,7 +366,7 @@ func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Ch
// If copyLastChunk is true, then it makes a copy of the head chunk if asked for it.
// Also returns max time of the chunk.
func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
sid, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
sid, cid, isOOO := unpackHeadChunkRef(meta.Ref)
s := h.head.series.getByID(sid)
// This means that the series has been garbage collected.
@ -376,11 +376,20 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
s.Lock()
defer s.Unlock()
return h.head.chunkFromSeries(s, cid, h.mint, h.maxt, h.isoState, copyLastChunk)
return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk)
}
// Dumb thing to defeat chunk pool.
type wrapOOOHeadChunk struct {
chunkenc.Chunk
}
// Call with s locked.
func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
if isOOO {
chk, maxTime, err := s.oooChunk(cid, h.chunkDiskMapper, &h.memChunkPool)
return wrapOOOHeadChunk{chk}, maxTime, err
}
c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool)
if err != nil {
return nil, 0, err
@ -481,85 +490,19 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
return elem, true, offset == 0, nil
}
// mergedChunks return an iterable over all chunks that overlap the
// time window [mint,maxt], plus meta.Chunk if populated.
// If hr is non-nil then in-order chunks are included.
// This function is not thread safe unless the caller holds a lock.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) {
// We create a temporary slice of chunk metas to hold the information of all
// possible chunks that may overlap with the requested chunk.
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1)
// oooChunk returns the chunk for the HeadChunkID by m-mapping it from the disk.
// It never returns the head OOO chunk.
func (s *memSeries) oooChunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk chunkenc.Chunk, maxTime int64, err error) {
// ix represents the index of chunk in the s.ooo.oooMmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstOOOChunkID) gives the slice index.
ix := int(id) - int(s.ooo.firstOOOChunkID)
for i, c := range s.ooo.oooMmappedChunks {
if maxMmapRef != 0 && c.ref > maxMmapRef {
break
}
if c.OverlapsClosedInterval(mint, maxt) {
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))),
},
ref: c.ref,
})
}
}
// Add in data copied from the head OOO chunk.
if meta.Chunk != nil {
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta})
if ix < 0 || ix >= len(s.ooo.oooMmappedChunks) {
return nil, 0, storage.ErrNotFound
}
if hr != nil { // Include in-order chunks.
metas := appendSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), nil)
for _, m := range metas {
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: m,
ref: 0, // This tells the loop below it's an in-order head chunk.
})
}
}
// Next we want to sort all the collected chunks by min time so we can find
// those that overlap and stop when we know the rest don't.
slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef)
mc := &mergedOOOChunks{}
absoluteMax := int64(math.MinInt64)
for _, c := range tmpChks {
if c.meta.Ref != meta.Ref && (len(mc.chunkIterables) == 0 || c.meta.MinTime > absoluteMax) {
continue
}
var iterable chunkenc.Iterable
switch {
case c.meta.Chunk != nil:
iterable = c.meta.Chunk
case c.ref == 0: // This is an in-order head chunk.
_, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack()
var err error
iterable, _, err = hr.head.chunkFromSeries(s, cid, hr.mint, hr.maxt, hr.isoState, false)
if err != nil {
return nil, fmt.Errorf("invalid head chunk: %w", err)
}
default:
chk, err := cdm.Chunk(c.ref)
if err != nil {
var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) {
return nil, fmt.Errorf("invalid ooo mmapped chunk: %w", err)
}
return nil, err
}
iterable = chk
}
mc.chunkIterables = append(mc.chunkIterables, iterable)
if c.meta.MaxTime > absoluteMax {
absoluteMax = c.meta.MaxTime
}
}
return mc, nil
chk, err := chunkDiskMapper.Chunk(s.ooo.oooMmappedChunks[ix].ref)
return chk, s.ooo.oooMmappedChunks[ix].maxTime, err
}
// safeHeadChunk makes sure that the chunk can be accessed without a race condition.

View file

@ -16,6 +16,7 @@ package tsdb
import (
"context"
"errors"
"fmt"
"math"
"slices"
@ -91,11 +92,10 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
tmpChks = append(tmpChks, chunks.Meta{
MinTime: minT,
MaxTime: maxT,
Ref: ref,
Chunk: chunk,
MergeOOO: true,
MinTime: minT,
MaxTime: maxT,
Ref: ref,
Chunk: chunk,
})
}
@ -140,34 +140,39 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
// those that overlap.
slices.SortFunc(tmpChks, lessByMinTimeAndMinRef)
// Next we want to iterate the sorted collected chunks and only return the
// chunks Meta the first chunk that overlaps with others.
// Next we want to iterate the sorted collected chunks and return composites for chunks that overlap with others.
// Example chunks of a series: 5:(100, 200) 6:(500, 600) 7:(150, 250) 8:(550, 650)
// In the example 5 overlaps with 7 and 6 overlaps with 8 so we only want to
// return chunk Metas for chunk 5 and chunk 6e
*chks = append(*chks, tmpChks[0])
maxTime := tmpChks[0].MaxTime // Tracks the maxTime of the previous "to be merged chunk".
// In the example 5 overlaps with 7 and 6 overlaps with 8 so we will return
// [5,7], [6,8].
toBeMerged := tmpChks[0]
for _, c := range tmpChks[1:] {
switch {
case c.MinTime > maxTime:
*chks = append(*chks, c)
maxTime = c.MaxTime
case c.MaxTime > maxTime:
maxTime = c.MaxTime
(*chks)[len(*chks)-1].MaxTime = c.MaxTime
fallthrough
default:
// If the head OOO chunk is part of an output chunk, copy the chunk pointer.
if c.Chunk != nil {
(*chks)[len(*chks)-1].Chunk = c.Chunk
if c.MinTime > toBeMerged.MaxTime {
// This chunk doesn't overlap. Send current toBeMerged to output and start a new one.
*chks = append(*chks, toBeMerged)
toBeMerged = c
} else {
// Merge this chunk with existing toBeMerged.
if mm, ok := toBeMerged.Chunk.(*multiMeta); ok {
mm.metas = append(mm.metas, c)
} else {
toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged, c}}
}
if toBeMerged.MaxTime < c.MaxTime {
toBeMerged.MaxTime = c.MaxTime
}
(*chks)[len(*chks)-1].MergeOOO = (*chks)[len(*chks)-1].MergeOOO || c.MergeOOO
}
}
*chks = append(*chks, toBeMerged)
return nil
}
// Fake Chunk object to pass a set of Metas inside Meta.Chunk.
type multiMeta struct {
chunkenc.Chunk // We don't expect any of the methods to be called.
metas []chunks.Meta
}
// LabelValues needs to be overridden from the headIndexReader implementation
// so we can return labels within either in-order range or ooo range.
func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
@ -182,29 +187,6 @@ func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, m
return labelValuesWithMatchers(ctx, oh, name, matchers...)
}
type chunkMetaAndChunkDiskMapperRef struct {
meta chunks.Meta
ref chunks.ChunkDiskMapperRef
}
func refLessByMinTimeAndMinRef(a, b chunkMetaAndChunkDiskMapperRef) int {
switch {
case a.meta.MinTime < b.meta.MinTime:
return -1
case a.meta.MinTime > b.meta.MinTime:
return 1
}
switch {
case a.meta.Ref < b.meta.Ref:
return -1
case a.meta.Ref > b.meta.Ref:
return 1
default:
return 0
}
}
func lessByMinTimeAndMinRef(a, b chunks.Meta) int {
switch {
case a.MinTime < b.MinTime:
@ -243,36 +225,55 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader,
}
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
sid, _, _ := unpackHeadChunkRef(meta.Ref)
if !meta.MergeOOO {
return cr.cr.ChunkOrIterable(meta)
}
s := cr.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, nil, storage.ErrNotFound
}
s.Lock()
if s.ooo == nil { // Must have s.ooo non-nil to call mergedChunks().
s.Unlock()
return cr.cr.ChunkOrIterable(meta)
}
mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
s.Unlock()
return nil, mc, err
c, it, _, err := cr.chunkOrIterable(meta, false)
return c, it, err
}
// ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy
// behaviour is only implemented for the in-order head chunk.
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
if !meta.MergeOOO {
return cr.cr.ChunkOrIterableWithCopy(meta)
return cr.chunkOrIterable(meta, true)
}
func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
sid, cid, isOOO := unpackHeadChunkRef(meta.Ref)
s := cr.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, nil, 0, storage.ErrNotFound
}
chk, iter, err := cr.ChunkOrIterable(meta)
return chk, iter, 0, err
var isoState *isolationState
if cr.cr != nil {
isoState = cr.cr.isoState
}
s.Lock()
defer s.Unlock()
if meta.Chunk == nil {
c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk)
return c, nil, maxt, err
}
mm, ok := meta.Chunk.(*multiMeta)
if !ok { // Complete chunk was supplied.
return meta.Chunk, nil, meta.MaxTime, nil
}
// We have a composite meta: construct a composite iterable.
mc := &mergedOOOChunks{}
for _, m := range mm.metas {
switch {
case m.Chunk != nil:
mc.chunkIterables = append(mc.chunkIterables, m.Chunk)
default:
_, cid, isOOO := unpackHeadChunkRef(m.Ref)
iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk)
if err != nil {
return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err)
}
mc.chunkIterables = append(mc.chunkIterables, iterable)
}
}
return nil, mc, meta.MaxTime, nil
}
func (cr *HeadAndOOOChunkReader) Close() error {

View file

@ -308,10 +308,9 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
var expChunks []chunks.Meta
for _, e := range tc.expChunks {
meta := chunks.Meta{
Chunk: chunkenc.Chunk(nil),
MinTime: e.mint,
MaxTime: e.maxt,
MergeOOO: true, // Only OOO chunks are tested here, so we always request merge from OOO head.
Chunk: chunkenc.Chunk(nil),
MinTime: e.mint,
MaxTime: e.maxt,
}
// Ref to whatever Ref the chunk has, that we refer to by ID
@ -485,7 +484,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
defer cr.Close()
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, MergeOOO: true,
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
})
require.Nil(t, iterable)
require.Equal(t, err, fmt.Errorf("not found"))
@ -1030,94 +1029,6 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
}
}
// TestSortByMinTimeAndMinRef tests that the sort function for chunk metas does sort
// by chunk meta MinTime and in case of same references by the lower reference.
func TestSortByMinTimeAndMinRef(t *testing.T) {
tests := []struct {
name string
input []chunkMetaAndChunkDiskMapperRef
exp []chunkMetaAndChunkDiskMapperRef
}{
{
name: "chunks are ordered by min time",
input: []chunkMetaAndChunkDiskMapperRef{
{
meta: chunks.Meta{
Ref: 0,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(0),
},
{
meta: chunks.Meta{
Ref: 1,
MinTime: 1,
},
ref: chunks.ChunkDiskMapperRef(1),
},
},
exp: []chunkMetaAndChunkDiskMapperRef{
{
meta: chunks.Meta{
Ref: 0,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(0),
},
{
meta: chunks.Meta{
Ref: 1,
MinTime: 1,
},
ref: chunks.ChunkDiskMapperRef(1),
},
},
},
{
name: "if same mintime, lower reference goes first",
input: []chunkMetaAndChunkDiskMapperRef{
{
meta: chunks.Meta{
Ref: 10,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(0),
},
{
meta: chunks.Meta{
Ref: 5,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(1),
},
},
exp: []chunkMetaAndChunkDiskMapperRef{
{
meta: chunks.Meta{
Ref: 5,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(1),
},
{
meta: chunks.Meta{
Ref: 10,
MinTime: 0,
},
ref: chunks.ChunkDiskMapperRef(0),
},
},
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
slices.SortFunc(tc.input, refLessByMinTimeAndMinRef)
require.Equal(t, tc.exp, tc.input)
})
}
}
// TestSortMetaByMinTimeAndMinRef tests that the sort function for chunk metas does sort
// by chunk meta MinTime and in case of same references by the lower reference.
func TestSortMetaByMinTimeAndMinRef(t *testing.T) {