From cde42f30e9c91a23458661ee33d47f828fa63b43 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 23 Aug 2024 17:10:02 +0100 Subject: [PATCH] TSDB: streamline reading of overlapping head chunks `getOOOSeriesChunks` was already finding sets of overlapping chunks; we store those in a `multiMeta` struct so that `ChunkOrIterable` can reconstruct an `Iterable` easily and predictably. We no longer need a `MergeOOO` flag to indicate that this Meta should be merged with other ones; this is explicit in the `multiMeta` structure. We also no longer need `chunkMetaAndChunkDiskMapperRef`. Add `wrapOOOHeadChunk` to defeat `chunkenc.Pool` - chunks are reset during compaction, but if we wrap them (like `safeHeadChunk` was doing then this is skipped) . Signed-off-by: Bryan Boreham --- tsdb/chunks/chunks.go | 3 - tsdb/head_read.go | 101 ++++++-------------------- tsdb/ooo_head_read.go | 143 +++++++++++++++++++------------------ tsdb/ooo_head_read_test.go | 97 ++----------------------- 4 files changed, 98 insertions(+), 246 deletions(-) diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 69201c6db..ec0f6d403 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -133,9 +133,6 @@ type Meta struct { // Time range the data covers. // When MaxTime == math.MaxInt64 the chunk is still open and being appended to. MinTime, MaxTime int64 - - // Flag to indicate that this meta needs merge with OOO data. - MergeOOO bool } // ChunkFromSamples requires all samples to have the same type. diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 780eaa669..d81ffbb6a 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -366,7 +366,7 @@ func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Ch // If copyLastChunk is true, then it makes a copy of the head chunk if asked for it. // Also returns max time of the chunk. func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, int64, error) { - sid, cid := chunks.HeadChunkRef(meta.Ref).Unpack() + sid, cid, isOOO := unpackHeadChunkRef(meta.Ref) s := h.head.series.getByID(sid) // This means that the series has been garbage collected. @@ -376,11 +376,20 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. s.Lock() defer s.Unlock() - return h.head.chunkFromSeries(s, cid, h.mint, h.maxt, h.isoState, copyLastChunk) + return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk) +} + +// Dumb thing to defeat chunk pool. +type wrapOOOHeadChunk struct { + chunkenc.Chunk } // Call with s locked. -func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) { +func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) { + if isOOO { + chk, maxTime, err := s.oooChunk(cid, h.chunkDiskMapper, &h.memChunkPool) + return wrapOOOHeadChunk{chk}, maxTime, err + } c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool) if err != nil { return nil, 0, err @@ -481,85 +490,19 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi return elem, true, offset == 0, nil } -// mergedChunks return an iterable over all chunks that overlap the -// time window [mint,maxt], plus meta.Chunk if populated. -// If hr is non-nil then in-order chunks are included. -// This function is not thread safe unless the caller holds a lock. -// The caller must ensure that s.ooo is not nil. -func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) { - // We create a temporary slice of chunk metas to hold the information of all - // possible chunks that may overlap with the requested chunk. - tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1) +// oooChunk returns the chunk for the HeadChunkID by m-mapping it from the disk. +// It never returns the head OOO chunk. +func (s *memSeries) oooChunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk chunkenc.Chunk, maxTime int64, err error) { + // ix represents the index of chunk in the s.ooo.oooMmappedChunks slice. The chunk id's are + // incremented by 1 when new chunk is created, hence (id - firstOOOChunkID) gives the slice index. + ix := int(id) - int(s.ooo.firstOOOChunkID) - for i, c := range s.ooo.oooMmappedChunks { - if maxMmapRef != 0 && c.ref > maxMmapRef { - break - } - if c.OverlapsClosedInterval(mint, maxt) { - tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ - meta: chunks.Meta{ - MinTime: c.minTime, - MaxTime: c.maxTime, - Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))), - }, - ref: c.ref, - }) - } - } - // Add in data copied from the head OOO chunk. - if meta.Chunk != nil { - tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta}) + if ix < 0 || ix >= len(s.ooo.oooMmappedChunks) { + return nil, 0, storage.ErrNotFound } - if hr != nil { // Include in-order chunks. - metas := appendSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), nil) - for _, m := range metas { - tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ - meta: m, - ref: 0, // This tells the loop below it's an in-order head chunk. - }) - } - } - - // Next we want to sort all the collected chunks by min time so we can find - // those that overlap and stop when we know the rest don't. - slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef) - - mc := &mergedOOOChunks{} - absoluteMax := int64(math.MinInt64) - for _, c := range tmpChks { - if c.meta.Ref != meta.Ref && (len(mc.chunkIterables) == 0 || c.meta.MinTime > absoluteMax) { - continue - } - var iterable chunkenc.Iterable - switch { - case c.meta.Chunk != nil: - iterable = c.meta.Chunk - case c.ref == 0: // This is an in-order head chunk. - _, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack() - var err error - iterable, _, err = hr.head.chunkFromSeries(s, cid, hr.mint, hr.maxt, hr.isoState, false) - if err != nil { - return nil, fmt.Errorf("invalid head chunk: %w", err) - } - default: - chk, err := cdm.Chunk(c.ref) - if err != nil { - var cerr *chunks.CorruptionErr - if errors.As(err, &cerr) { - return nil, fmt.Errorf("invalid ooo mmapped chunk: %w", err) - } - return nil, err - } - iterable = chk - } - mc.chunkIterables = append(mc.chunkIterables, iterable) - if c.meta.MaxTime > absoluteMax { - absoluteMax = c.meta.MaxTime - } - } - - return mc, nil + chk, err := chunkDiskMapper.Chunk(s.ooo.oooMmappedChunks[ix].ref) + return chk, s.ooo.oooMmappedChunks[ix].maxTime, err } // safeHeadChunk makes sure that the chunk can be accessed without a race condition. diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index a50decd62..7b58ec566 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -16,6 +16,7 @@ package tsdb import ( "context" "errors" + "fmt" "math" "slices" @@ -91,11 +92,10 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { tmpChks = append(tmpChks, chunks.Meta{ - MinTime: minT, - MaxTime: maxT, - Ref: ref, - Chunk: chunk, - MergeOOO: true, + MinTime: minT, + MaxTime: maxT, + Ref: ref, + Chunk: chunk, }) } @@ -140,34 +140,39 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap // those that overlap. slices.SortFunc(tmpChks, lessByMinTimeAndMinRef) - // Next we want to iterate the sorted collected chunks and only return the - // chunks Meta the first chunk that overlaps with others. + // Next we want to iterate the sorted collected chunks and return composites for chunks that overlap with others. // Example chunks of a series: 5:(100, 200) 6:(500, 600) 7:(150, 250) 8:(550, 650) - // In the example 5 overlaps with 7 and 6 overlaps with 8 so we only want to - // return chunk Metas for chunk 5 and chunk 6e - *chks = append(*chks, tmpChks[0]) - maxTime := tmpChks[0].MaxTime // Tracks the maxTime of the previous "to be merged chunk". + // In the example 5 overlaps with 7 and 6 overlaps with 8 so we will return + // [5,7], [6,8]. + toBeMerged := tmpChks[0] for _, c := range tmpChks[1:] { - switch { - case c.MinTime > maxTime: - *chks = append(*chks, c) - maxTime = c.MaxTime - case c.MaxTime > maxTime: - maxTime = c.MaxTime - (*chks)[len(*chks)-1].MaxTime = c.MaxTime - fallthrough - default: - // If the head OOO chunk is part of an output chunk, copy the chunk pointer. - if c.Chunk != nil { - (*chks)[len(*chks)-1].Chunk = c.Chunk + if c.MinTime > toBeMerged.MaxTime { + // This chunk doesn't overlap. Send current toBeMerged to output and start a new one. + *chks = append(*chks, toBeMerged) + toBeMerged = c + } else { + // Merge this chunk with existing toBeMerged. + if mm, ok := toBeMerged.Chunk.(*multiMeta); ok { + mm.metas = append(mm.metas, c) + } else { + toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged, c}} + } + if toBeMerged.MaxTime < c.MaxTime { + toBeMerged.MaxTime = c.MaxTime } - (*chks)[len(*chks)-1].MergeOOO = (*chks)[len(*chks)-1].MergeOOO || c.MergeOOO } } + *chks = append(*chks, toBeMerged) return nil } +// Fake Chunk object to pass a set of Metas inside Meta.Chunk. +type multiMeta struct { + chunkenc.Chunk // We don't expect any of the methods to be called. + metas []chunks.Meta +} + // LabelValues needs to be overridden from the headIndexReader implementation // so we can return labels within either in-order range or ooo range. func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { @@ -182,29 +187,6 @@ func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, m return labelValuesWithMatchers(ctx, oh, name, matchers...) } -type chunkMetaAndChunkDiskMapperRef struct { - meta chunks.Meta - ref chunks.ChunkDiskMapperRef -} - -func refLessByMinTimeAndMinRef(a, b chunkMetaAndChunkDiskMapperRef) int { - switch { - case a.meta.MinTime < b.meta.MinTime: - return -1 - case a.meta.MinTime > b.meta.MinTime: - return 1 - } - - switch { - case a.meta.Ref < b.meta.Ref: - return -1 - case a.meta.Ref > b.meta.Ref: - return 1 - default: - return 0 - } -} - func lessByMinTimeAndMinRef(a, b chunks.Meta) int { switch { case a.MinTime < b.MinTime: @@ -243,36 +225,55 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, } func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { - sid, _, _ := unpackHeadChunkRef(meta.Ref) - if !meta.MergeOOO { - return cr.cr.ChunkOrIterable(meta) - } - - s := cr.head.series.getByID(sid) - // This means that the series has been garbage collected. - if s == nil { - return nil, nil, storage.ErrNotFound - } - - s.Lock() - if s.ooo == nil { // Must have s.ooo non-nil to call mergedChunks(). - s.Unlock() - return cr.cr.ChunkOrIterable(meta) - } - mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) - s.Unlock() - - return nil, mc, err + c, it, _, err := cr.chunkOrIterable(meta, false) + return c, it, err } // ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy // behaviour is only implemented for the in-order head chunk. func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { - if !meta.MergeOOO { - return cr.cr.ChunkOrIterableWithCopy(meta) + return cr.chunkOrIterable(meta, true) +} + +func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { + sid, cid, isOOO := unpackHeadChunkRef(meta.Ref) + s := cr.head.series.getByID(sid) + // This means that the series has been garbage collected. + if s == nil { + return nil, nil, 0, storage.ErrNotFound } - chk, iter, err := cr.ChunkOrIterable(meta) - return chk, iter, 0, err + var isoState *isolationState + if cr.cr != nil { + isoState = cr.cr.isoState + } + + s.Lock() + defer s.Unlock() + + if meta.Chunk == nil { + c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk) + return c, nil, maxt, err + } + mm, ok := meta.Chunk.(*multiMeta) + if !ok { // Complete chunk was supplied. + return meta.Chunk, nil, meta.MaxTime, nil + } + // We have a composite meta: construct a composite iterable. + mc := &mergedOOOChunks{} + for _, m := range mm.metas { + switch { + case m.Chunk != nil: + mc.chunkIterables = append(mc.chunkIterables, m.Chunk) + default: + _, cid, isOOO := unpackHeadChunkRef(m.Ref) + iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk) + if err != nil { + return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err) + } + mc.chunkIterables = append(mc.chunkIterables, iterable) + } + } + return nil, mc, meta.MaxTime, nil } func (cr *HeadAndOOOChunkReader) Close() error { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index e565933f8..545bc7b6e 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -308,10 +308,9 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { var expChunks []chunks.Meta for _, e := range tc.expChunks { meta := chunks.Meta{ - Chunk: chunkenc.Chunk(nil), - MinTime: e.mint, - MaxTime: e.maxt, - MergeOOO: true, // Only OOO chunks are tested here, so we always request merge from OOO head. + Chunk: chunkenc.Chunk(nil), + MinTime: e.mint, + MaxTime: e.maxt, } // Ref to whatever Ref the chunk has, that we refer to by ID @@ -485,7 +484,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0) defer cr.Close() c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ - Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, MergeOOO: true, + Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, }) require.Nil(t, iterable) require.Equal(t, err, fmt.Errorf("not found")) @@ -1030,94 +1029,6 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( } } -// TestSortByMinTimeAndMinRef tests that the sort function for chunk metas does sort -// by chunk meta MinTime and in case of same references by the lower reference. -func TestSortByMinTimeAndMinRef(t *testing.T) { - tests := []struct { - name string - input []chunkMetaAndChunkDiskMapperRef - exp []chunkMetaAndChunkDiskMapperRef - }{ - { - name: "chunks are ordered by min time", - input: []chunkMetaAndChunkDiskMapperRef{ - { - meta: chunks.Meta{ - Ref: 0, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(0), - }, - { - meta: chunks.Meta{ - Ref: 1, - MinTime: 1, - }, - ref: chunks.ChunkDiskMapperRef(1), - }, - }, - exp: []chunkMetaAndChunkDiskMapperRef{ - { - meta: chunks.Meta{ - Ref: 0, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(0), - }, - { - meta: chunks.Meta{ - Ref: 1, - MinTime: 1, - }, - ref: chunks.ChunkDiskMapperRef(1), - }, - }, - }, - { - name: "if same mintime, lower reference goes first", - input: []chunkMetaAndChunkDiskMapperRef{ - { - meta: chunks.Meta{ - Ref: 10, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(0), - }, - { - meta: chunks.Meta{ - Ref: 5, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(1), - }, - }, - exp: []chunkMetaAndChunkDiskMapperRef{ - { - meta: chunks.Meta{ - Ref: 5, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(1), - }, - { - meta: chunks.Meta{ - Ref: 10, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(0), - }, - }, - }, - } - - for _, tc := range tests { - t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { - slices.SortFunc(tc.input, refLessByMinTimeAndMinRef) - require.Equal(t, tc.exp, tc.input) - }) - } -} - // TestSortMetaByMinTimeAndMinRef tests that the sort function for chunk metas does sort // by chunk meta MinTime and in case of same references by the lower reference. func TestSortMetaByMinTimeAndMinRef(t *testing.T) {