From 1851c65f2f5199628d40570e61e8da873af0ed20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Sun, 4 Aug 2024 15:30:43 +0200 Subject: [PATCH] Fix select vs populate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When executing promql expressions, the engine first collects all series and chunk metas. The chunk metas need to refer to immutable chunks, otherwise reading the same series twice during evaluation of an expression could yield different values. Head chunks are not immutable as opposed to chunks from blocks or mmap-ed chunks, thus they have to be copied. Before native histograms, the copy of the OOO head could only result in a single chunk, which was attached to chunks.Meta.Chunk field by OOOHeadIndexReader.series method. It would be a major rewrite to make chunks.Meta.Chunk field an array or an Iterable. It also seems hard/impossible and definitely less optimal to generate more chunks.Meta from the OOOHeadIndexReader.series() function as the metas need to be non overlapping (this can be done), but also tell memSeries.oooMergedChunks later what to return from overlapping chunks. Signed-off-by: György Krajcsovits --- tsdb/ooo_head_read.go | 55 ++++++++++++++++++++++++++++++++++---- tsdb/ooo_head_read_test.go | 7 +++++ 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index dd45a9ced1..59e33111e4 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -29,8 +29,6 @@ import ( "github.com/prometheus/prometheus/tsdb/tombstones" ) -var _ IndexReader = &OOOHeadIndexReader{} - // OOOHeadIndexReader implements IndexReader so ooo samples in the head can be // accessed. // It also has a reference to headIndexReader so we can leverage on its @@ -42,8 +40,6 @@ type OOOHeadIndexReader struct { lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef } -var _ chunkenc.Iterable = &mergedOOOChunks{} - // mergedOOOChunks holds the list of iterables for overlapping chunks. type mergedOOOChunks struct { chunkIterables []chunkenc.Iterable @@ -66,6 +62,53 @@ func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.Scra return oh.series(ref, builder, chks, oh.lastGarbageCollectedMmapRef, 0) } +type MultiChunk struct { + chunks []chunkenc.Chunk +} + +func (c MultiChunk) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + switch len(c.chunks) { + case 0: + return chunkenc.NewNopIterator() + case 1: + return c.chunks[0].Iterator(it) + default: + iterators := make([]chunkenc.Iterator, 0, len(c.chunks)) + for _, chk := range c.chunks { + iterators = append(iterators, chk.Iterator(nil)) + } + return storage.ChainSampleIteratorFromIterators(it, iterators) + } +} + +func (c MultiChunk) Appender() (chunkenc.Appender, error) { + return nil, errors.New("not implemented") +} + +func (c MultiChunk) Bytes() []byte { + return nil +} + +func (c MultiChunk) Compact() { + // no-op +} + +func (c MultiChunk) Encoding() chunkenc.Encoding { + return chunkenc.EncNone +} + +func (c MultiChunk) NumSamples() int { + sum := 0 + for _, chk := range c.chunks { + sum += chk.NumSamples() + } + return sum +} + +func (c MultiChunk) Reset([]byte) { + // no-op +} + // lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so // any chunk at or before this ref will not be considered. 0 disables this check. // @@ -109,14 +152,16 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. + headChunks := MultiChunk{} chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime) if err != nil { handleChunkWriteError(err) return nil } for _, chk := range chks { - addChunk(chk.minTime, chk.maxTime, ref, chk.chunk) + headChunks.chunks = append(headChunks.chunks, chk.chunk) } + addChunk(c.minTime, c.maxTime, ref, headChunks) } else { var emptyChunk chunkenc.Chunk addChunk(c.minTime, c.maxTime, ref, emptyChunk) diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index ec73a59f8b..f8ce8ecb82 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -31,6 +31,13 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" ) +// Type assertions. +var ( + _ chunkenc.Chunk = &MultiChunk{} + _ chunkenc.Iterable = &mergedOOOChunks{} + _ IndexReader = &OOOHeadIndexReader{} +) + type chunkInterval struct { // because we permutate the order of chunks, we cannot determine at test declaration time which chunkRefs we expect in the Output. // This ID matches expected output chunks against test input chunks, the test runner will assert the chunkRef for the matching chunk