mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Fix select vs populate
When executing promql expressions, the engine first collects all series and chunk metas. The chunk metas need to refer to immutable chunks, otherwise reading the same series twice during evaluation of an expression could yield different values. Head chunks are not immutable as opposed to chunks from blocks or mmap-ed chunks, thus they have to be copied. Before native histograms, the copy of the OOO head could only result in a single chunk, which was attached to chunks.Meta.Chunk field by OOOHeadIndexReader.series method. It would be a major rewrite to make chunks.Meta.Chunk field an array or an Iterable. It also seems hard/impossible and definitely less optimal to generate more chunks.Meta from the OOOHeadIndexReader.series() function as the metas need to be non overlapping (this can be done), but also tell memSeries.oooMergedChunks later what to return from overlapping chunks. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
90a41cd937
commit
1851c65f2f
|
@ -29,8 +29,6 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
)
|
||||
|
||||
var _ IndexReader = &OOOHeadIndexReader{}
|
||||
|
||||
// OOOHeadIndexReader implements IndexReader so ooo samples in the head can be
|
||||
// accessed.
|
||||
// It also has a reference to headIndexReader so we can leverage on its
|
||||
|
@ -42,8 +40,6 @@ type OOOHeadIndexReader struct {
|
|||
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
|
||||
}
|
||||
|
||||
var _ chunkenc.Iterable = &mergedOOOChunks{}
|
||||
|
||||
// mergedOOOChunks holds the list of iterables for overlapping chunks.
|
||||
type mergedOOOChunks struct {
|
||||
chunkIterables []chunkenc.Iterable
|
||||
|
@ -66,6 +62,53 @@ func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.Scra
|
|||
return oh.series(ref, builder, chks, oh.lastGarbageCollectedMmapRef, 0)
|
||||
}
|
||||
|
||||
type MultiChunk struct {
|
||||
chunks []chunkenc.Chunk
|
||||
}
|
||||
|
||||
func (c MultiChunk) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
|
||||
switch len(c.chunks) {
|
||||
case 0:
|
||||
return chunkenc.NewNopIterator()
|
||||
case 1:
|
||||
return c.chunks[0].Iterator(it)
|
||||
default:
|
||||
iterators := make([]chunkenc.Iterator, 0, len(c.chunks))
|
||||
for _, chk := range c.chunks {
|
||||
iterators = append(iterators, chk.Iterator(nil))
|
||||
}
|
||||
return storage.ChainSampleIteratorFromIterators(it, iterators)
|
||||
}
|
||||
}
|
||||
|
||||
func (c MultiChunk) Appender() (chunkenc.Appender, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (c MultiChunk) Bytes() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c MultiChunk) Compact() {
|
||||
// no-op
|
||||
}
|
||||
|
||||
func (c MultiChunk) Encoding() chunkenc.Encoding {
|
||||
return chunkenc.EncNone
|
||||
}
|
||||
|
||||
func (c MultiChunk) NumSamples() int {
|
||||
sum := 0
|
||||
for _, chk := range c.chunks {
|
||||
sum += chk.NumSamples()
|
||||
}
|
||||
return sum
|
||||
}
|
||||
|
||||
func (c MultiChunk) Reset([]byte) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so
|
||||
// any chunk at or before this ref will not be considered. 0 disables this check.
|
||||
//
|
||||
|
@ -109,14 +152,16 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
|
|||
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 {
|
||||
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
|
||||
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
|
||||
headChunks := MultiChunk{}
|
||||
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime)
|
||||
if err != nil {
|
||||
handleChunkWriteError(err)
|
||||
return nil
|
||||
}
|
||||
for _, chk := range chks {
|
||||
addChunk(chk.minTime, chk.maxTime, ref, chk.chunk)
|
||||
headChunks.chunks = append(headChunks.chunks, chk.chunk)
|
||||
}
|
||||
addChunk(c.minTime, c.maxTime, ref, headChunks)
|
||||
} else {
|
||||
var emptyChunk chunkenc.Chunk
|
||||
addChunk(c.minTime, c.maxTime, ref, emptyChunk)
|
||||
|
|
|
@ -31,6 +31,13 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
// Type assertions.
|
||||
var (
|
||||
_ chunkenc.Chunk = &MultiChunk{}
|
||||
_ chunkenc.Iterable = &mergedOOOChunks{}
|
||||
_ IndexReader = &OOOHeadIndexReader{}
|
||||
)
|
||||
|
||||
type chunkInterval struct {
|
||||
// because we permutate the order of chunks, we cannot determine at test declaration time which chunkRefs we expect in the Output.
|
||||
// This ID matches expected output chunks against test input chunks, the test runner will assert the chunkRef for the matching chunk
|
||||
|
|
Loading…
Reference in a new issue