mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
Merge pull request #14729 from bboreham/multimeta
TSDB: streamline reading of overlapping head chunks
This commit is contained in:
commit
5359567264
|
@ -133,9 +133,6 @@ type Meta struct {
|
|||
// Time range the data covers.
|
||||
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
|
||||
MinTime, MaxTime int64
|
||||
|
||||
// Flag to indicate that this meta needs merge with OOO data.
|
||||
MergeOOO bool
|
||||
}
|
||||
|
||||
// ChunkFromSamples requires all samples to have the same type.
|
||||
|
|
|
@ -366,7 +366,7 @@ func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Ch
|
|||
// If copyLastChunk is true, then it makes a copy of the head chunk if asked for it.
|
||||
// Also returns max time of the chunk.
|
||||
func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
|
||||
sid, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
|
||||
sid, cid, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
|
||||
s := h.head.series.getByID(sid)
|
||||
// This means that the series has been garbage collected.
|
||||
|
@ -376,12 +376,21 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
|
|||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
return h.chunkFromSeries(s, cid, copyLastChunk)
|
||||
return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk)
|
||||
}
|
||||
|
||||
// Dumb thing to defeat chunk pool.
|
||||
type wrapOOOHeadChunk struct {
|
||||
chunkenc.Chunk
|
||||
}
|
||||
|
||||
// Call with s locked.
|
||||
func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
|
||||
c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool)
|
||||
func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
|
||||
if isOOO {
|
||||
chk, maxTime, err := s.oooChunk(cid, h.chunkDiskMapper, &h.memChunkPool)
|
||||
return wrapOOOHeadChunk{chk}, maxTime, err
|
||||
}
|
||||
c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
@ -390,12 +399,12 @@ func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID,
|
|||
// Set this to nil so that Go GC can collect it after it has been used.
|
||||
c.chunk = nil
|
||||
c.prev = nil
|
||||
h.head.memChunkPool.Put(c)
|
||||
h.memChunkPool.Put(c)
|
||||
}
|
||||
}()
|
||||
|
||||
// This means that the chunk is outside the specified range.
|
||||
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
|
||||
if !c.OverlapsClosedInterval(mint, maxt) {
|
||||
return nil, 0, storage.ErrNotFound
|
||||
}
|
||||
|
||||
|
@ -407,7 +416,7 @@ func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID,
|
|||
newB := make([]byte, len(b))
|
||||
copy(newB, b) // TODO(codesome): Use bytes.Clone() when we upgrade to Go 1.20.
|
||||
// TODO(codesome): Put back in the pool (non-trivial).
|
||||
chk, err = h.head.opts.ChunkPool.Get(s.headChunks.chunk.Encoding(), newB)
|
||||
chk, err = h.opts.ChunkPool.Get(s.headChunks.chunk.Encoding(), newB)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
@ -417,7 +426,7 @@ func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID,
|
|||
Chunk: chk,
|
||||
s: s,
|
||||
cid: cid,
|
||||
isoState: h.isoState,
|
||||
isoState: isoState,
|
||||
}, maxTime, nil
|
||||
}
|
||||
|
||||
|
@ -481,85 +490,19 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
|
|||
return elem, true, offset == 0, nil
|
||||
}
|
||||
|
||||
// mergedChunks return an iterable over all chunks that overlap the
|
||||
// time window [mint,maxt], plus meta.Chunk if populated.
|
||||
// If hr is non-nil then in-order chunks are included.
|
||||
// This function is not thread safe unless the caller holds a lock.
|
||||
// The caller must ensure that s.ooo is not nil.
|
||||
func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) {
|
||||
// We create a temporary slice of chunk metas to hold the information of all
|
||||
// possible chunks that may overlap with the requested chunk.
|
||||
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1)
|
||||
// oooChunk returns the chunk for the HeadChunkID by m-mapping it from the disk.
|
||||
// It never returns the head OOO chunk.
|
||||
func (s *memSeries) oooChunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk chunkenc.Chunk, maxTime int64, err error) {
|
||||
// ix represents the index of chunk in the s.ooo.oooMmappedChunks slice. The chunk id's are
|
||||
// incremented by 1 when new chunk is created, hence (id - firstOOOChunkID) gives the slice index.
|
||||
ix := int(id) - int(s.ooo.firstOOOChunkID)
|
||||
|
||||
for i, c := range s.ooo.oooMmappedChunks {
|
||||
if maxMmapRef != 0 && c.ref > maxMmapRef {
|
||||
break
|
||||
}
|
||||
if c.OverlapsClosedInterval(mint, maxt) {
|
||||
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
|
||||
meta: chunks.Meta{
|
||||
MinTime: c.minTime,
|
||||
MaxTime: c.maxTime,
|
||||
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))),
|
||||
},
|
||||
ref: c.ref,
|
||||
})
|
||||
}
|
||||
}
|
||||
// Add in data copied from the head OOO chunk.
|
||||
if meta.Chunk != nil {
|
||||
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta})
|
||||
if ix < 0 || ix >= len(s.ooo.oooMmappedChunks) {
|
||||
return nil, 0, storage.ErrNotFound
|
||||
}
|
||||
|
||||
if hr != nil { // Include in-order chunks.
|
||||
metas := appendSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), nil)
|
||||
for _, m := range metas {
|
||||
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
|
||||
meta: m,
|
||||
ref: 0, // This tells the loop below it's an in-order head chunk.
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Next we want to sort all the collected chunks by min time so we can find
|
||||
// those that overlap and stop when we know the rest don't.
|
||||
slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef)
|
||||
|
||||
mc := &mergedOOOChunks{}
|
||||
absoluteMax := int64(math.MinInt64)
|
||||
for _, c := range tmpChks {
|
||||
if c.meta.Ref != meta.Ref && (len(mc.chunkIterables) == 0 || c.meta.MinTime > absoluteMax) {
|
||||
continue
|
||||
}
|
||||
var iterable chunkenc.Iterable
|
||||
switch {
|
||||
case c.meta.Chunk != nil:
|
||||
iterable = c.meta.Chunk
|
||||
case c.ref == 0: // This is an in-order head chunk.
|
||||
_, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack()
|
||||
var err error
|
||||
iterable, _, err = hr.chunkFromSeries(s, cid, false)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid head chunk: %w", err)
|
||||
}
|
||||
default:
|
||||
chk, err := cdm.Chunk(c.ref)
|
||||
if err != nil {
|
||||
var cerr *chunks.CorruptionErr
|
||||
if errors.As(err, &cerr) {
|
||||
return nil, fmt.Errorf("invalid ooo mmapped chunk: %w", err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
iterable = chk
|
||||
}
|
||||
mc.chunkIterables = append(mc.chunkIterables, iterable)
|
||||
if c.meta.MaxTime > absoluteMax {
|
||||
absoluteMax = c.meta.MaxTime
|
||||
}
|
||||
}
|
||||
|
||||
return mc, nil
|
||||
chk, err := chunkDiskMapper.Chunk(s.ooo.oooMmappedChunks[ix].ref)
|
||||
return chk, s.ooo.oooMmappedChunks[ix].maxTime, err
|
||||
}
|
||||
|
||||
// safeHeadChunk makes sure that the chunk can be accessed without a race condition.
|
||||
|
|
|
@ -16,6 +16,7 @@ package tsdb
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
|
||||
|
@ -91,11 +92,10 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||
|
||||
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
|
||||
tmpChks = append(tmpChks, chunks.Meta{
|
||||
MinTime: minT,
|
||||
MaxTime: maxT,
|
||||
Ref: ref,
|
||||
Chunk: chunk,
|
||||
MergeOOO: true,
|
||||
MinTime: minT,
|
||||
MaxTime: maxT,
|
||||
Ref: ref,
|
||||
Chunk: chunk,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -140,34 +140,39 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||
// those that overlap.
|
||||
slices.SortFunc(tmpChks, lessByMinTimeAndMinRef)
|
||||
|
||||
// Next we want to iterate the sorted collected chunks and only return the
|
||||
// chunks Meta the first chunk that overlaps with others.
|
||||
// Next we want to iterate the sorted collected chunks and return composites for chunks that overlap with others.
|
||||
// Example chunks of a series: 5:(100, 200) 6:(500, 600) 7:(150, 250) 8:(550, 650)
|
||||
// In the example 5 overlaps with 7 and 6 overlaps with 8 so we only want to
|
||||
// return chunk Metas for chunk 5 and chunk 6e
|
||||
*chks = append(*chks, tmpChks[0])
|
||||
maxTime := tmpChks[0].MaxTime // Tracks the maxTime of the previous "to be merged chunk".
|
||||
// In the example 5 overlaps with 7 and 6 overlaps with 8 so we will return
|
||||
// [5,7], [6,8].
|
||||
toBeMerged := tmpChks[0]
|
||||
for _, c := range tmpChks[1:] {
|
||||
switch {
|
||||
case c.MinTime > maxTime:
|
||||
*chks = append(*chks, c)
|
||||
maxTime = c.MaxTime
|
||||
case c.MaxTime > maxTime:
|
||||
maxTime = c.MaxTime
|
||||
(*chks)[len(*chks)-1].MaxTime = c.MaxTime
|
||||
fallthrough
|
||||
default:
|
||||
// If the head OOO chunk is part of an output chunk, copy the chunk pointer.
|
||||
if c.Chunk != nil {
|
||||
(*chks)[len(*chks)-1].Chunk = c.Chunk
|
||||
if c.MinTime > toBeMerged.MaxTime {
|
||||
// This chunk doesn't overlap. Send current toBeMerged to output and start a new one.
|
||||
*chks = append(*chks, toBeMerged)
|
||||
toBeMerged = c
|
||||
} else {
|
||||
// Merge this chunk with existing toBeMerged.
|
||||
if mm, ok := toBeMerged.Chunk.(*multiMeta); ok {
|
||||
mm.metas = append(mm.metas, c)
|
||||
} else {
|
||||
toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged, c}}
|
||||
}
|
||||
if toBeMerged.MaxTime < c.MaxTime {
|
||||
toBeMerged.MaxTime = c.MaxTime
|
||||
}
|
||||
(*chks)[len(*chks)-1].MergeOOO = (*chks)[len(*chks)-1].MergeOOO || c.MergeOOO
|
||||
}
|
||||
}
|
||||
*chks = append(*chks, toBeMerged)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fake Chunk object to pass a set of Metas inside Meta.Chunk.
|
||||
type multiMeta struct {
|
||||
chunkenc.Chunk // We don't expect any of the methods to be called.
|
||||
metas []chunks.Meta
|
||||
}
|
||||
|
||||
// LabelValues needs to be overridden from the headIndexReader implementation
|
||||
// so we can return labels within either in-order range or ooo range.
|
||||
func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
|
@ -182,29 +187,6 @@ func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, m
|
|||
return labelValuesWithMatchers(ctx, oh, name, matchers...)
|
||||
}
|
||||
|
||||
type chunkMetaAndChunkDiskMapperRef struct {
|
||||
meta chunks.Meta
|
||||
ref chunks.ChunkDiskMapperRef
|
||||
}
|
||||
|
||||
func refLessByMinTimeAndMinRef(a, b chunkMetaAndChunkDiskMapperRef) int {
|
||||
switch {
|
||||
case a.meta.MinTime < b.meta.MinTime:
|
||||
return -1
|
||||
case a.meta.MinTime > b.meta.MinTime:
|
||||
return 1
|
||||
}
|
||||
|
||||
switch {
|
||||
case a.meta.Ref < b.meta.Ref:
|
||||
return -1
|
||||
case a.meta.Ref > b.meta.Ref:
|
||||
return 1
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
func lessByMinTimeAndMinRef(a, b chunks.Meta) int {
|
||||
switch {
|
||||
case a.MinTime < b.MinTime:
|
||||
|
@ -243,36 +225,55 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader,
|
|||
}
|
||||
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
||||
sid, _, _ := unpackHeadChunkRef(meta.Ref)
|
||||
if !meta.MergeOOO {
|
||||
return cr.cr.ChunkOrIterable(meta)
|
||||
}
|
||||
|
||||
s := cr.head.series.getByID(sid)
|
||||
// This means that the series has been garbage collected.
|
||||
if s == nil {
|
||||
return nil, nil, storage.ErrNotFound
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
if s.ooo == nil { // Must have s.ooo non-nil to call mergedChunks().
|
||||
s.Unlock()
|
||||
return cr.cr.ChunkOrIterable(meta)
|
||||
}
|
||||
mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
|
||||
s.Unlock()
|
||||
|
||||
return nil, mc, err
|
||||
c, it, _, err := cr.chunkOrIterable(meta, false)
|
||||
return c, it, err
|
||||
}
|
||||
|
||||
// ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy
|
||||
// behaviour is only implemented for the in-order head chunk.
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||
if !meta.MergeOOO {
|
||||
return cr.cr.ChunkOrIterableWithCopy(meta)
|
||||
return cr.chunkOrIterable(meta, true)
|
||||
}
|
||||
|
||||
func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||
sid, cid, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
s := cr.head.series.getByID(sid)
|
||||
// This means that the series has been garbage collected.
|
||||
if s == nil {
|
||||
return nil, nil, 0, storage.ErrNotFound
|
||||
}
|
||||
chk, iter, err := cr.ChunkOrIterable(meta)
|
||||
return chk, iter, 0, err
|
||||
var isoState *isolationState
|
||||
if cr.cr != nil {
|
||||
isoState = cr.cr.isoState
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if meta.Chunk == nil {
|
||||
c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk)
|
||||
return c, nil, maxt, err
|
||||
}
|
||||
mm, ok := meta.Chunk.(*multiMeta)
|
||||
if !ok { // Complete chunk was supplied.
|
||||
return meta.Chunk, nil, meta.MaxTime, nil
|
||||
}
|
||||
// We have a composite meta: construct a composite iterable.
|
||||
mc := &mergedOOOChunks{}
|
||||
for _, m := range mm.metas {
|
||||
switch {
|
||||
case m.Chunk != nil:
|
||||
mc.chunkIterables = append(mc.chunkIterables, m.Chunk)
|
||||
default:
|
||||
_, cid, isOOO := unpackHeadChunkRef(m.Ref)
|
||||
iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk)
|
||||
if err != nil {
|
||||
return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err)
|
||||
}
|
||||
mc.chunkIterables = append(mc.chunkIterables, iterable)
|
||||
}
|
||||
}
|
||||
return nil, mc, meta.MaxTime, nil
|
||||
}
|
||||
|
||||
func (cr *HeadAndOOOChunkReader) Close() error {
|
||||
|
|
|
@ -39,6 +39,11 @@ type chunkInterval struct {
|
|||
maxt int64
|
||||
}
|
||||
|
||||
type expChunk struct {
|
||||
c chunkInterval
|
||||
m []chunkInterval
|
||||
}
|
||||
|
||||
// permutateChunkIntervals returns all possible orders of the given chunkIntervals.
|
||||
func permutateChunkIntervals(in []chunkInterval, out [][]chunkInterval, left, right int) [][]chunkInterval {
|
||||
if left == right {
|
||||
|
@ -65,7 +70,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
queryMinT int64
|
||||
queryMaxT int64
|
||||
inputChunkIntervals []chunkInterval
|
||||
expChunks []chunkInterval
|
||||
expChunks []expChunk
|
||||
}{
|
||||
{
|
||||
name: "Empty result and no error when head is empty",
|
||||
|
@ -107,8 +112,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
|
||||
// Query Interval [-----------------------------------------------------------]
|
||||
// Chunk 0: [---------------------------------------]
|
||||
expChunks: []chunkInterval{
|
||||
{0, 150, 350},
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 150, 350}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -121,8 +126,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// ts 0 100 150 200 250 300 350 400 450 500 550 600 650 700
|
||||
// Query Interval: [---------------------------------------]
|
||||
// Chunk 0: [-----------------------------------------------------------]
|
||||
expChunks: []chunkInterval{
|
||||
{0, 100, 400},
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 400}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -142,9 +147,9 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 2: [-------------------]
|
||||
// Chunk 3: [-------------------]
|
||||
// Output Graphically [-----------------------------] [-----------------------------]
|
||||
expChunks: []chunkInterval{
|
||||
{0, 100, 250},
|
||||
{1, 500, 650},
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 250}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}}},
|
||||
{c: chunkInterval{1, 500, 650}, m: []chunkInterval{{1, 500, 600}, {3, 550, 650}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -164,8 +169,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 2: [-------------------]
|
||||
// Chunk 3: [------------------]
|
||||
// Output Graphically [------------------------------------------------------------------------------]
|
||||
expChunks: []chunkInterval{
|
||||
{0, 100, 500},
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 500}, m: []chunkInterval{{0, 100, 200}, {1, 200, 300}, {2, 300, 400}, {3, 400, 500}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -185,11 +190,11 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 2: [------------------]
|
||||
// Chunk 3: [------------------]
|
||||
// Output Graphically [------------------][------------------][------------------][------------------]
|
||||
expChunks: []chunkInterval{
|
||||
{0, 100, 199},
|
||||
{1, 200, 299},
|
||||
{2, 300, 399},
|
||||
{3, 400, 499},
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 199}},
|
||||
{c: chunkInterval{1, 200, 299}},
|
||||
{c: chunkInterval{2, 300, 399}},
|
||||
{c: chunkInterval{3, 400, 499}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -209,8 +214,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 2: [------------------]
|
||||
// Chunk 3: [------------------]
|
||||
// Output Graphically [-----------------------------------------------]
|
||||
expChunks: []chunkInterval{
|
||||
{0, 100, 350},
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 350}, m: []chunkInterval{{0, 100, 200}, {1, 150, 300}, {2, 250, 350}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -228,8 +233,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 1: [-----------------------------]
|
||||
// Chunk 2: [------------------------------]
|
||||
// Output Graphically [-----------------------------------------------------------------------------------------]
|
||||
expChunks: []chunkInterval{
|
||||
{1, 0, 500},
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{1, 0, 500}, m: []chunkInterval{{1, 0, 200}, {2, 150, 300}, {0, 250, 500}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -251,9 +256,9 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 3: [-------------------]
|
||||
// Chunk 4: [---------------------------------------]
|
||||
// Output Graphically [---------------------------------------] [------------------------------------------------]
|
||||
expChunks: []chunkInterval{
|
||||
{0, 100, 300},
|
||||
{4, 600, 850},
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 300}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}}},
|
||||
{c: chunkInterval{4, 600, 850}, m: []chunkInterval{{4, 600, 800}, {3, 650, 750}, {1, 770, 850}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -271,10 +276,10 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 1: [----------]
|
||||
// Chunk 2: [--------]
|
||||
// Output Graphically [-------] [--------] [----------]
|
||||
expChunks: []chunkInterval{
|
||||
{0, 100, 150},
|
||||
{1, 300, 350},
|
||||
{2, 200, 250},
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 150}},
|
||||
{c: chunkInterval{2, 200, 250}},
|
||||
{c: chunkInterval{1, 300, 350}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -305,25 +310,38 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
s1.ooo = &memSeriesOOOFields{}
|
||||
|
||||
// define our expected chunks, by looking at the expected ChunkIntervals and setting...
|
||||
// Ref to whatever Ref the chunk has, that we refer to by ID
|
||||
findID := func(id int) chunks.ChunkRef {
|
||||
for ref, c := range intervals {
|
||||
if c.ID == id {
|
||||
return chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref)))
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
var expChunks []chunks.Meta
|
||||
for _, e := range tc.expChunks {
|
||||
meta := chunks.Meta{
|
||||
Chunk: chunkenc.Chunk(nil),
|
||||
MinTime: e.mint,
|
||||
MaxTime: e.maxt,
|
||||
MergeOOO: true, // Only OOO chunks are tested here, so we always request merge from OOO head.
|
||||
}
|
||||
|
||||
// Ref to whatever Ref the chunk has, that we refer to by ID
|
||||
for ref, c := range intervals {
|
||||
if c.ID == e.ID {
|
||||
meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref)))
|
||||
break
|
||||
var chunk chunkenc.Chunk
|
||||
if len(e.m) > 0 {
|
||||
mm := &multiMeta{}
|
||||
for _, x := range e.m {
|
||||
meta := chunks.Meta{
|
||||
MinTime: x.mint,
|
||||
MaxTime: x.maxt,
|
||||
Ref: findID(x.ID),
|
||||
}
|
||||
mm.metas = append(mm.metas, meta)
|
||||
}
|
||||
chunk = mm
|
||||
}
|
||||
meta := chunks.Meta{
|
||||
Chunk: chunk,
|
||||
MinTime: e.c.mint,
|
||||
MaxTime: e.c.maxt,
|
||||
Ref: findID(e.c.ID),
|
||||
}
|
||||
expChunks = append(expChunks, meta)
|
||||
}
|
||||
slices.SortFunc(expChunks, lessByMinTimeAndMinRef) // We always want the chunks to come back sorted by minTime asc.
|
||||
|
||||
if headChunk && len(intervals) > 0 {
|
||||
// Put the last interval in the head chunk
|
||||
|
@ -485,7 +503,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
|
||||
defer cr.Close()
|
||||
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
|
||||
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, MergeOOO: true,
|
||||
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
|
||||
})
|
||||
require.Nil(t, iterable)
|
||||
require.Equal(t, err, fmt.Errorf("not found"))
|
||||
|
@ -498,6 +516,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
queryMaxT int64
|
||||
firstInOrderSampleAt int64
|
||||
inputSamples []testValue
|
||||
expSingleChunks bool
|
||||
expChunkError bool
|
||||
expChunksSamples []chunks.SampleSlice
|
||||
}{
|
||||
|
@ -510,7 +529,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
{Ts: minutes(30), V: 0},
|
||||
{Ts: minutes(40), V: 0},
|
||||
},
|
||||
expChunkError: false,
|
||||
expChunkError: false,
|
||||
expSingleChunks: true,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
// Query Interval [------------------------------------------------------------------------------------------]
|
||||
// Chunk 0: Current Head [--------] (With 2 samples)
|
||||
|
@ -690,7 +710,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
{Ts: minutes(40), V: 3},
|
||||
{Ts: minutes(42), V: 3},
|
||||
},
|
||||
expChunkError: false,
|
||||
expChunkError: false,
|
||||
expSingleChunks: true,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
// Query Interval [------------------------------------------------------------------------------------------]
|
||||
// Chunk 0 [-------]
|
||||
|
@ -845,9 +866,13 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
for i := 0; i < len(chks); i++ {
|
||||
c, iterable, err := cr.ChunkOrIterable(chks[i])
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, c)
|
||||
|
||||
it := iterable.Iterator(nil)
|
||||
var it chunkenc.Iterator
|
||||
if tc.expSingleChunks {
|
||||
it = c.Iterator(nil)
|
||||
} else {
|
||||
require.Nil(t, c)
|
||||
it = iterable.Iterator(nil)
|
||||
}
|
||||
resultSamples, err := storage.ExpandSamples(it, nil)
|
||||
require.NoError(t, err)
|
||||
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
|
||||
|
@ -1030,94 +1055,6 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
}
|
||||
}
|
||||
|
||||
// TestSortByMinTimeAndMinRef tests that the sort function for chunk metas does sort
|
||||
// by chunk meta MinTime and in case of same references by the lower reference.
|
||||
func TestSortByMinTimeAndMinRef(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input []chunkMetaAndChunkDiskMapperRef
|
||||
exp []chunkMetaAndChunkDiskMapperRef
|
||||
}{
|
||||
{
|
||||
name: "chunks are ordered by min time",
|
||||
input: []chunkMetaAndChunkDiskMapperRef{
|
||||
{
|
||||
meta: chunks.Meta{
|
||||
Ref: 0,
|
||||
MinTime: 0,
|
||||
},
|
||||
ref: chunks.ChunkDiskMapperRef(0),
|
||||
},
|
||||
{
|
||||
meta: chunks.Meta{
|
||||
Ref: 1,
|
||||
MinTime: 1,
|
||||
},
|
||||
ref: chunks.ChunkDiskMapperRef(1),
|
||||
},
|
||||
},
|
||||
exp: []chunkMetaAndChunkDiskMapperRef{
|
||||
{
|
||||
meta: chunks.Meta{
|
||||
Ref: 0,
|
||||
MinTime: 0,
|
||||
},
|
||||
ref: chunks.ChunkDiskMapperRef(0),
|
||||
},
|
||||
{
|
||||
meta: chunks.Meta{
|
||||
Ref: 1,
|
||||
MinTime: 1,
|
||||
},
|
||||
ref: chunks.ChunkDiskMapperRef(1),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "if same mintime, lower reference goes first",
|
||||
input: []chunkMetaAndChunkDiskMapperRef{
|
||||
{
|
||||
meta: chunks.Meta{
|
||||
Ref: 10,
|
||||
MinTime: 0,
|
||||
},
|
||||
ref: chunks.ChunkDiskMapperRef(0),
|
||||
},
|
||||
{
|
||||
meta: chunks.Meta{
|
||||
Ref: 5,
|
||||
MinTime: 0,
|
||||
},
|
||||
ref: chunks.ChunkDiskMapperRef(1),
|
||||
},
|
||||
},
|
||||
exp: []chunkMetaAndChunkDiskMapperRef{
|
||||
{
|
||||
meta: chunks.Meta{
|
||||
Ref: 5,
|
||||
MinTime: 0,
|
||||
},
|
||||
ref: chunks.ChunkDiskMapperRef(1),
|
||||
},
|
||||
{
|
||||
meta: chunks.Meta{
|
||||
Ref: 10,
|
||||
MinTime: 0,
|
||||
},
|
||||
ref: chunks.ChunkDiskMapperRef(0),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
||||
slices.SortFunc(tc.input, refLessByMinTimeAndMinRef)
|
||||
require.Equal(t, tc.exp, tc.input)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestSortMetaByMinTimeAndMinRef tests that the sort function for chunk metas does sort
|
||||
// by chunk meta MinTime and in case of same references by the lower reference.
|
||||
func TestSortMetaByMinTimeAndMinRef(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue