From 838e49e7b8dfc05967bf4ddc95025550b80cd5cd Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 23 Aug 2024 16:57:43 +0100 Subject: [PATCH 1/3] [REFACTOR] TSDB: move chunkFromSeries from headChunkReader to head Signed-off-by: Bryan Boreham --- tsdb/head_read.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 24d75f9c72..780eaa6690 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -376,12 +376,12 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. s.Lock() defer s.Unlock() - return h.chunkFromSeries(s, cid, copyLastChunk) + return h.head.chunkFromSeries(s, cid, h.mint, h.maxt, h.isoState, copyLastChunk) } // Call with s locked. -func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, copyLastChunk bool) (chunkenc.Chunk, int64, error) { - c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool) +func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) { + c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool) if err != nil { return nil, 0, err } @@ -390,12 +390,12 @@ func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, // Set this to nil so that Go GC can collect it after it has been used. c.chunk = nil c.prev = nil - h.head.memChunkPool.Put(c) + h.memChunkPool.Put(c) } }() // This means that the chunk is outside the specified range. - if !c.OverlapsClosedInterval(h.mint, h.maxt) { + if !c.OverlapsClosedInterval(mint, maxt) { return nil, 0, storage.ErrNotFound } @@ -407,7 +407,7 @@ func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, newB := make([]byte, len(b)) copy(newB, b) // TODO(codesome): Use bytes.Clone() when we upgrade to Go 1.20. // TODO(codesome): Put back in the pool (non-trivial). - chk, err = h.head.opts.ChunkPool.Get(s.headChunks.chunk.Encoding(), newB) + chk, err = h.opts.ChunkPool.Get(s.headChunks.chunk.Encoding(), newB) if err != nil { return nil, 0, err } @@ -417,7 +417,7 @@ func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, Chunk: chk, s: s, cid: cid, - isoState: h.isoState, + isoState: isoState, }, maxTime, nil } @@ -538,7 +538,7 @@ func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, case c.ref == 0: // This is an in-order head chunk. _, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack() var err error - iterable, _, err = hr.chunkFromSeries(s, cid, false) + iterable, _, err = hr.head.chunkFromSeries(s, cid, hr.mint, hr.maxt, hr.isoState, false) if err != nil { return nil, fmt.Errorf("invalid head chunk: %w", err) } From cde42f30e9c91a23458661ee33d47f828fa63b43 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 23 Aug 2024 17:10:02 +0100 Subject: [PATCH 2/3] TSDB: streamline reading of overlapping head chunks `getOOOSeriesChunks` was already finding sets of overlapping chunks; we store those in a `multiMeta` struct so that `ChunkOrIterable` can reconstruct an `Iterable` easily and predictably. We no longer need a `MergeOOO` flag to indicate that this Meta should be merged with other ones; this is explicit in the `multiMeta` structure. We also no longer need `chunkMetaAndChunkDiskMapperRef`. Add `wrapOOOHeadChunk` to defeat `chunkenc.Pool` - chunks are reset during compaction, but if we wrap them (like `safeHeadChunk` was doing then this is skipped) . Signed-off-by: Bryan Boreham --- tsdb/chunks/chunks.go | 3 - tsdb/head_read.go | 101 ++++++-------------------- tsdb/ooo_head_read.go | 143 +++++++++++++++++++------------------ tsdb/ooo_head_read_test.go | 97 ++----------------------- 4 files changed, 98 insertions(+), 246 deletions(-) diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 69201c6db7..ec0f6d4036 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -133,9 +133,6 @@ type Meta struct { // Time range the data covers. // When MaxTime == math.MaxInt64 the chunk is still open and being appended to. MinTime, MaxTime int64 - - // Flag to indicate that this meta needs merge with OOO data. - MergeOOO bool } // ChunkFromSamples requires all samples to have the same type. diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 780eaa6690..d81ffbb6a0 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -366,7 +366,7 @@ func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Ch // If copyLastChunk is true, then it makes a copy of the head chunk if asked for it. // Also returns max time of the chunk. func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, int64, error) { - sid, cid := chunks.HeadChunkRef(meta.Ref).Unpack() + sid, cid, isOOO := unpackHeadChunkRef(meta.Ref) s := h.head.series.getByID(sid) // This means that the series has been garbage collected. @@ -376,11 +376,20 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. s.Lock() defer s.Unlock() - return h.head.chunkFromSeries(s, cid, h.mint, h.maxt, h.isoState, copyLastChunk) + return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk) +} + +// Dumb thing to defeat chunk pool. +type wrapOOOHeadChunk struct { + chunkenc.Chunk } // Call with s locked. -func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) { +func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) { + if isOOO { + chk, maxTime, err := s.oooChunk(cid, h.chunkDiskMapper, &h.memChunkPool) + return wrapOOOHeadChunk{chk}, maxTime, err + } c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool) if err != nil { return nil, 0, err @@ -481,85 +490,19 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi return elem, true, offset == 0, nil } -// mergedChunks return an iterable over all chunks that overlap the -// time window [mint,maxt], plus meta.Chunk if populated. -// If hr is non-nil then in-order chunks are included. -// This function is not thread safe unless the caller holds a lock. -// The caller must ensure that s.ooo is not nil. -func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) { - // We create a temporary slice of chunk metas to hold the information of all - // possible chunks that may overlap with the requested chunk. - tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1) +// oooChunk returns the chunk for the HeadChunkID by m-mapping it from the disk. +// It never returns the head OOO chunk. +func (s *memSeries) oooChunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk chunkenc.Chunk, maxTime int64, err error) { + // ix represents the index of chunk in the s.ooo.oooMmappedChunks slice. The chunk id's are + // incremented by 1 when new chunk is created, hence (id - firstOOOChunkID) gives the slice index. + ix := int(id) - int(s.ooo.firstOOOChunkID) - for i, c := range s.ooo.oooMmappedChunks { - if maxMmapRef != 0 && c.ref > maxMmapRef { - break - } - if c.OverlapsClosedInterval(mint, maxt) { - tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ - meta: chunks.Meta{ - MinTime: c.minTime, - MaxTime: c.maxTime, - Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))), - }, - ref: c.ref, - }) - } - } - // Add in data copied from the head OOO chunk. - if meta.Chunk != nil { - tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta}) + if ix < 0 || ix >= len(s.ooo.oooMmappedChunks) { + return nil, 0, storage.ErrNotFound } - if hr != nil { // Include in-order chunks. - metas := appendSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), nil) - for _, m := range metas { - tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ - meta: m, - ref: 0, // This tells the loop below it's an in-order head chunk. - }) - } - } - - // Next we want to sort all the collected chunks by min time so we can find - // those that overlap and stop when we know the rest don't. - slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef) - - mc := &mergedOOOChunks{} - absoluteMax := int64(math.MinInt64) - for _, c := range tmpChks { - if c.meta.Ref != meta.Ref && (len(mc.chunkIterables) == 0 || c.meta.MinTime > absoluteMax) { - continue - } - var iterable chunkenc.Iterable - switch { - case c.meta.Chunk != nil: - iterable = c.meta.Chunk - case c.ref == 0: // This is an in-order head chunk. - _, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack() - var err error - iterable, _, err = hr.head.chunkFromSeries(s, cid, hr.mint, hr.maxt, hr.isoState, false) - if err != nil { - return nil, fmt.Errorf("invalid head chunk: %w", err) - } - default: - chk, err := cdm.Chunk(c.ref) - if err != nil { - var cerr *chunks.CorruptionErr - if errors.As(err, &cerr) { - return nil, fmt.Errorf("invalid ooo mmapped chunk: %w", err) - } - return nil, err - } - iterable = chk - } - mc.chunkIterables = append(mc.chunkIterables, iterable) - if c.meta.MaxTime > absoluteMax { - absoluteMax = c.meta.MaxTime - } - } - - return mc, nil + chk, err := chunkDiskMapper.Chunk(s.ooo.oooMmappedChunks[ix].ref) + return chk, s.ooo.oooMmappedChunks[ix].maxTime, err } // safeHeadChunk makes sure that the chunk can be accessed without a race condition. diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index a50decd623..7b58ec566f 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -16,6 +16,7 @@ package tsdb import ( "context" "errors" + "fmt" "math" "slices" @@ -91,11 +92,10 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { tmpChks = append(tmpChks, chunks.Meta{ - MinTime: minT, - MaxTime: maxT, - Ref: ref, - Chunk: chunk, - MergeOOO: true, + MinTime: minT, + MaxTime: maxT, + Ref: ref, + Chunk: chunk, }) } @@ -140,34 +140,39 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap // those that overlap. slices.SortFunc(tmpChks, lessByMinTimeAndMinRef) - // Next we want to iterate the sorted collected chunks and only return the - // chunks Meta the first chunk that overlaps with others. + // Next we want to iterate the sorted collected chunks and return composites for chunks that overlap with others. // Example chunks of a series: 5:(100, 200) 6:(500, 600) 7:(150, 250) 8:(550, 650) - // In the example 5 overlaps with 7 and 6 overlaps with 8 so we only want to - // return chunk Metas for chunk 5 and chunk 6e - *chks = append(*chks, tmpChks[0]) - maxTime := tmpChks[0].MaxTime // Tracks the maxTime of the previous "to be merged chunk". + // In the example 5 overlaps with 7 and 6 overlaps with 8 so we will return + // [5,7], [6,8]. + toBeMerged := tmpChks[0] for _, c := range tmpChks[1:] { - switch { - case c.MinTime > maxTime: - *chks = append(*chks, c) - maxTime = c.MaxTime - case c.MaxTime > maxTime: - maxTime = c.MaxTime - (*chks)[len(*chks)-1].MaxTime = c.MaxTime - fallthrough - default: - // If the head OOO chunk is part of an output chunk, copy the chunk pointer. - if c.Chunk != nil { - (*chks)[len(*chks)-1].Chunk = c.Chunk + if c.MinTime > toBeMerged.MaxTime { + // This chunk doesn't overlap. Send current toBeMerged to output and start a new one. + *chks = append(*chks, toBeMerged) + toBeMerged = c + } else { + // Merge this chunk with existing toBeMerged. + if mm, ok := toBeMerged.Chunk.(*multiMeta); ok { + mm.metas = append(mm.metas, c) + } else { + toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged, c}} + } + if toBeMerged.MaxTime < c.MaxTime { + toBeMerged.MaxTime = c.MaxTime } - (*chks)[len(*chks)-1].MergeOOO = (*chks)[len(*chks)-1].MergeOOO || c.MergeOOO } } + *chks = append(*chks, toBeMerged) return nil } +// Fake Chunk object to pass a set of Metas inside Meta.Chunk. +type multiMeta struct { + chunkenc.Chunk // We don't expect any of the methods to be called. + metas []chunks.Meta +} + // LabelValues needs to be overridden from the headIndexReader implementation // so we can return labels within either in-order range or ooo range. func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { @@ -182,29 +187,6 @@ func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, m return labelValuesWithMatchers(ctx, oh, name, matchers...) } -type chunkMetaAndChunkDiskMapperRef struct { - meta chunks.Meta - ref chunks.ChunkDiskMapperRef -} - -func refLessByMinTimeAndMinRef(a, b chunkMetaAndChunkDiskMapperRef) int { - switch { - case a.meta.MinTime < b.meta.MinTime: - return -1 - case a.meta.MinTime > b.meta.MinTime: - return 1 - } - - switch { - case a.meta.Ref < b.meta.Ref: - return -1 - case a.meta.Ref > b.meta.Ref: - return 1 - default: - return 0 - } -} - func lessByMinTimeAndMinRef(a, b chunks.Meta) int { switch { case a.MinTime < b.MinTime: @@ -243,36 +225,55 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, } func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { - sid, _, _ := unpackHeadChunkRef(meta.Ref) - if !meta.MergeOOO { - return cr.cr.ChunkOrIterable(meta) - } - - s := cr.head.series.getByID(sid) - // This means that the series has been garbage collected. - if s == nil { - return nil, nil, storage.ErrNotFound - } - - s.Lock() - if s.ooo == nil { // Must have s.ooo non-nil to call mergedChunks(). - s.Unlock() - return cr.cr.ChunkOrIterable(meta) - } - mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) - s.Unlock() - - return nil, mc, err + c, it, _, err := cr.chunkOrIterable(meta, false) + return c, it, err } // ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy // behaviour is only implemented for the in-order head chunk. func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { - if !meta.MergeOOO { - return cr.cr.ChunkOrIterableWithCopy(meta) + return cr.chunkOrIterable(meta, true) +} + +func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { + sid, cid, isOOO := unpackHeadChunkRef(meta.Ref) + s := cr.head.series.getByID(sid) + // This means that the series has been garbage collected. + if s == nil { + return nil, nil, 0, storage.ErrNotFound } - chk, iter, err := cr.ChunkOrIterable(meta) - return chk, iter, 0, err + var isoState *isolationState + if cr.cr != nil { + isoState = cr.cr.isoState + } + + s.Lock() + defer s.Unlock() + + if meta.Chunk == nil { + c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk) + return c, nil, maxt, err + } + mm, ok := meta.Chunk.(*multiMeta) + if !ok { // Complete chunk was supplied. + return meta.Chunk, nil, meta.MaxTime, nil + } + // We have a composite meta: construct a composite iterable. + mc := &mergedOOOChunks{} + for _, m := range mm.metas { + switch { + case m.Chunk != nil: + mc.chunkIterables = append(mc.chunkIterables, m.Chunk) + default: + _, cid, isOOO := unpackHeadChunkRef(m.Ref) + iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk) + if err != nil { + return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err) + } + mc.chunkIterables = append(mc.chunkIterables, iterable) + } + } + return nil, mc, meta.MaxTime, nil } func (cr *HeadAndOOOChunkReader) Close() error { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index e565933f83..545bc7b6eb 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -308,10 +308,9 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { var expChunks []chunks.Meta for _, e := range tc.expChunks { meta := chunks.Meta{ - Chunk: chunkenc.Chunk(nil), - MinTime: e.mint, - MaxTime: e.maxt, - MergeOOO: true, // Only OOO chunks are tested here, so we always request merge from OOO head. + Chunk: chunkenc.Chunk(nil), + MinTime: e.mint, + MaxTime: e.maxt, } // Ref to whatever Ref the chunk has, that we refer to by ID @@ -485,7 +484,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0) defer cr.Close() c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ - Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, MergeOOO: true, + Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, }) require.Nil(t, iterable) require.Equal(t, err, fmt.Errorf("not found")) @@ -1030,94 +1029,6 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( } } -// TestSortByMinTimeAndMinRef tests that the sort function for chunk metas does sort -// by chunk meta MinTime and in case of same references by the lower reference. -func TestSortByMinTimeAndMinRef(t *testing.T) { - tests := []struct { - name string - input []chunkMetaAndChunkDiskMapperRef - exp []chunkMetaAndChunkDiskMapperRef - }{ - { - name: "chunks are ordered by min time", - input: []chunkMetaAndChunkDiskMapperRef{ - { - meta: chunks.Meta{ - Ref: 0, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(0), - }, - { - meta: chunks.Meta{ - Ref: 1, - MinTime: 1, - }, - ref: chunks.ChunkDiskMapperRef(1), - }, - }, - exp: []chunkMetaAndChunkDiskMapperRef{ - { - meta: chunks.Meta{ - Ref: 0, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(0), - }, - { - meta: chunks.Meta{ - Ref: 1, - MinTime: 1, - }, - ref: chunks.ChunkDiskMapperRef(1), - }, - }, - }, - { - name: "if same mintime, lower reference goes first", - input: []chunkMetaAndChunkDiskMapperRef{ - { - meta: chunks.Meta{ - Ref: 10, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(0), - }, - { - meta: chunks.Meta{ - Ref: 5, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(1), - }, - }, - exp: []chunkMetaAndChunkDiskMapperRef{ - { - meta: chunks.Meta{ - Ref: 5, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(1), - }, - { - meta: chunks.Meta{ - Ref: 10, - MinTime: 0, - }, - ref: chunks.ChunkDiskMapperRef(0), - }, - }, - }, - } - - for _, tc := range tests { - t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { - slices.SortFunc(tc.input, refLessByMinTimeAndMinRef) - require.Equal(t, tc.exp, tc.input) - }) - } -} - // TestSortMetaByMinTimeAndMinRef tests that the sort function for chunk metas does sort // by chunk meta MinTime and in case of same references by the lower reference. func TestSortMetaByMinTimeAndMinRef(t *testing.T) { From 1f38ae7bca8471e90a6fa227cd4cd567e25952c4 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 24 Aug 2024 16:50:15 +0100 Subject: [PATCH 3/3] [TESTS] TSDB: fix up OOO tests for new Series behaviour Signed-off-by: Bryan Boreham --- tsdb/ooo_head_read_test.go | 112 +++++++++++++++++++++++-------------- 1 file changed, 69 insertions(+), 43 deletions(-) diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 545bc7b6eb..40e37043b8 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -39,6 +39,11 @@ type chunkInterval struct { maxt int64 } +type expChunk struct { + c chunkInterval + m []chunkInterval +} + // permutateChunkIntervals returns all possible orders of the given chunkIntervals. func permutateChunkIntervals(in []chunkInterval, out [][]chunkInterval, left, right int) [][]chunkInterval { if left == right { @@ -65,7 +70,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { queryMinT int64 queryMaxT int64 inputChunkIntervals []chunkInterval - expChunks []chunkInterval + expChunks []expChunk }{ { name: "Empty result and no error when head is empty", @@ -107,8 +112,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval [-----------------------------------------------------------] // Chunk 0: [---------------------------------------] - expChunks: []chunkInterval{ - {0, 150, 350}, + expChunks: []expChunk{ + {c: chunkInterval{0, 150, 350}}, }, }, { @@ -121,8 +126,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700 // Query Interval: [---------------------------------------] // Chunk 0: [-----------------------------------------------------------] - expChunks: []chunkInterval{ - {0, 100, 400}, + expChunks: []expChunk{ + {c: chunkInterval{0, 100, 400}}, }, }, { @@ -142,9 +147,9 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 2: [-------------------] // Chunk 3: [-------------------] // Output Graphically [-----------------------------] [-----------------------------] - expChunks: []chunkInterval{ - {0, 100, 250}, - {1, 500, 650}, + expChunks: []expChunk{ + {c: chunkInterval{0, 100, 250}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}}}, + {c: chunkInterval{1, 500, 650}, m: []chunkInterval{{1, 500, 600}, {3, 550, 650}}}, }, }, { @@ -164,8 +169,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 2: [-------------------] // Chunk 3: [------------------] // Output Graphically [------------------------------------------------------------------------------] - expChunks: []chunkInterval{ - {0, 100, 500}, + expChunks: []expChunk{ + {c: chunkInterval{0, 100, 500}, m: []chunkInterval{{0, 100, 200}, {1, 200, 300}, {2, 300, 400}, {3, 400, 500}}}, }, }, { @@ -185,11 +190,11 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 2: [------------------] // Chunk 3: [------------------] // Output Graphically [------------------][------------------][------------------][------------------] - expChunks: []chunkInterval{ - {0, 100, 199}, - {1, 200, 299}, - {2, 300, 399}, - {3, 400, 499}, + expChunks: []expChunk{ + {c: chunkInterval{0, 100, 199}}, + {c: chunkInterval{1, 200, 299}}, + {c: chunkInterval{2, 300, 399}}, + {c: chunkInterval{3, 400, 499}}, }, }, { @@ -209,8 +214,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 2: [------------------] // Chunk 3: [------------------] // Output Graphically [-----------------------------------------------] - expChunks: []chunkInterval{ - {0, 100, 350}, + expChunks: []expChunk{ + {c: chunkInterval{0, 100, 350}, m: []chunkInterval{{0, 100, 200}, {1, 150, 300}, {2, 250, 350}}}, }, }, { @@ -228,8 +233,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 1: [-----------------------------] // Chunk 2: [------------------------------] // Output Graphically [-----------------------------------------------------------------------------------------] - expChunks: []chunkInterval{ - {1, 0, 500}, + expChunks: []expChunk{ + {c: chunkInterval{1, 0, 500}, m: []chunkInterval{{1, 0, 200}, {2, 150, 300}, {0, 250, 500}}}, }, }, { @@ -251,9 +256,9 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 3: [-------------------] // Chunk 4: [---------------------------------------] // Output Graphically [---------------------------------------] [------------------------------------------------] - expChunks: []chunkInterval{ - {0, 100, 300}, - {4, 600, 850}, + expChunks: []expChunk{ + {c: chunkInterval{0, 100, 300}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}}}, + {c: chunkInterval{4, 600, 850}, m: []chunkInterval{{4, 600, 800}, {3, 650, 750}, {1, 770, 850}}}, }, }, { @@ -271,10 +276,10 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 1: [----------] // Chunk 2: [--------] // Output Graphically [-------] [--------] [----------] - expChunks: []chunkInterval{ - {0, 100, 150}, - {1, 300, 350}, - {2, 200, 250}, + expChunks: []expChunk{ + {c: chunkInterval{0, 100, 150}}, + {c: chunkInterval{2, 200, 250}}, + {c: chunkInterval{1, 300, 350}}, }, }, } @@ -305,24 +310,38 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { s1.ooo = &memSeriesOOOFields{} // define our expected chunks, by looking at the expected ChunkIntervals and setting... + // Ref to whatever Ref the chunk has, that we refer to by ID + findID := func(id int) chunks.ChunkRef { + for ref, c := range intervals { + if c.ID == id { + return chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref))) + } + } + return 0 + } var expChunks []chunks.Meta for _, e := range tc.expChunks { - meta := chunks.Meta{ - Chunk: chunkenc.Chunk(nil), - MinTime: e.mint, - MaxTime: e.maxt, - } - - // Ref to whatever Ref the chunk has, that we refer to by ID - for ref, c := range intervals { - if c.ID == e.ID { - meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref))) - break + var chunk chunkenc.Chunk + if len(e.m) > 0 { + mm := &multiMeta{} + for _, x := range e.m { + meta := chunks.Meta{ + MinTime: x.mint, + MaxTime: x.maxt, + Ref: findID(x.ID), + } + mm.metas = append(mm.metas, meta) } + chunk = mm + } + meta := chunks.Meta{ + Chunk: chunk, + MinTime: e.c.mint, + MaxTime: e.c.maxt, + Ref: findID(e.c.ID), } expChunks = append(expChunks, meta) } - slices.SortFunc(expChunks, lessByMinTimeAndMinRef) // We always want the chunks to come back sorted by minTime asc. if headChunk && len(intervals) > 0 { // Put the last interval in the head chunk @@ -497,6 +516,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { queryMaxT int64 firstInOrderSampleAt int64 inputSamples []testValue + expSingleChunks bool expChunkError bool expChunksSamples []chunks.SampleSlice }{ @@ -509,7 +529,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { {Ts: minutes(30), V: 0}, {Ts: minutes(40), V: 0}, }, - expChunkError: false, + expChunkError: false, + expSingleChunks: true, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0: Current Head [--------] (With 2 samples) @@ -689,7 +710,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { {Ts: minutes(40), V: 3}, {Ts: minutes(42), V: 3}, }, - expChunkError: false, + expChunkError: false, + expSingleChunks: true, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0 [-------] @@ -844,9 +866,13 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i]) require.NoError(t, err) - require.Nil(t, c) - - it := iterable.Iterator(nil) + var it chunkenc.Iterator + if tc.expSingleChunks { + it = c.Iterator(nil) + } else { + require.Nil(t, c) + it = iterable.Iterator(nil) + } resultSamples, err := storage.ExpandSamples(it, nil) require.NoError(t, err) requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)