mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
[BUGFIX] TSDB: Fix query overlapping in-order and ooo head (#14693)
* tsdb: Unit test query overlapping in order and ooo head Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> * TSDB: Merge overlapping head chunk The basic idea is that getOOOSeriesChunks can populate Meta.Chunk, but since it only returns one Meta per overlapping time-slot, that pointer may end up in a Meta with a head-chunk ID. So we need HeadAndOOOChunkReader.ChunkOrIterable() to call mergedChunks in that case. Previously, mergedChunks was checking that meta.Ref was a valid OOO chunk reference, but it never actually uses that reference; it just finds all chunks overlapping in time. So we can delete that code. Signed-off-by: Bryan Boreham <bjboreham@gmail.com> Co-authored-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
7fad1ec8ee
commit
9a74d53935
|
@ -5042,10 +5042,16 @@ func Test_Querier_OOOQuery(t *testing.T) {
|
||||||
series1 := labels.FromStrings("foo", "bar1")
|
series1 := labels.FromStrings("foo", "bar1")
|
||||||
|
|
||||||
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
||||||
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample) ([]chunks.Sample, int) {
|
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter func(int64) bool) ([]chunks.Sample, int) {
|
||||||
|
if filter == nil {
|
||||||
|
filter = func(int64) bool { return true }
|
||||||
|
}
|
||||||
app := db.Appender(context.Background())
|
app := db.Appender(context.Background())
|
||||||
totalAppended := 0
|
totalAppended := 0
|
||||||
for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() {
|
for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() {
|
||||||
|
if !filter(m / time.Minute.Milliseconds()) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
_, err := app.Append(0, series1, m, float64(m))
|
_, err := app.Append(0, series1, m, float64(m))
|
||||||
if m >= queryMinT && m <= queryMaxT {
|
if m >= queryMinT && m <= queryMaxT {
|
||||||
expSamples = append(expSamples, sample{t: m, f: float64(m)})
|
expSamples = append(expSamples, sample{t: m, f: float64(m)})
|
||||||
|
@ -5084,6 +5090,15 @@ func Test_Querier_OOOQuery(t *testing.T) {
|
||||||
oooMinT: minutes(0),
|
oooMinT: minutes(0),
|
||||||
oooMaxT: minutes(99),
|
oooMaxT: minutes(99),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "query overlapping inorder and ooo samples returns all ingested samples",
|
||||||
|
queryMinT: minutes(0),
|
||||||
|
queryMaxT: minutes(200),
|
||||||
|
inOrderMinT: minutes(100),
|
||||||
|
inOrderMaxT: minutes(200),
|
||||||
|
oooMinT: minutes(180 - opts.OutOfOrderCapMax/2), // Make sure to fit into the OOO head.
|
||||||
|
oooMaxT: minutes(180),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
||||||
|
@ -5093,13 +5108,20 @@ func Test_Querier_OOOQuery(t *testing.T) {
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var expSamples []chunks.Sample
|
var (
|
||||||
|
expSamples []chunks.Sample
|
||||||
|
inoSamples int
|
||||||
|
)
|
||||||
|
|
||||||
// Add in-order samples.
|
// Add in-order samples (at even minutes).
|
||||||
expSamples, _ = addSample(db, tc.inOrderMinT, tc.inOrderMaxT, tc.queryMinT, tc.queryMaxT, expSamples)
|
expSamples, inoSamples = addSample(db, tc.inOrderMinT, tc.inOrderMaxT, tc.queryMinT, tc.queryMaxT, expSamples, func(t int64) bool { return t%2 == 0 })
|
||||||
|
// Sanity check that filter is not too zealous.
|
||||||
|
require.Positive(t, inoSamples, 0)
|
||||||
|
|
||||||
// Add out-of-order samples.
|
// Add out-of-order samples (at odd minutes).
|
||||||
expSamples, oooSamples := addSample(db, tc.oooMinT, tc.oooMaxT, tc.queryMinT, tc.queryMaxT, expSamples)
|
expSamples, oooSamples := addSample(db, tc.oooMinT, tc.oooMaxT, tc.queryMinT, tc.queryMaxT, expSamples, func(t int64) bool { return t%2 == 1 })
|
||||||
|
// Sanity check that filter is not too zealous.
|
||||||
|
require.Positive(t, oooSamples, 0)
|
||||||
|
|
||||||
sort.Slice(expSamples, func(i, j int) bool {
|
sort.Slice(expSamples, func(i, j int) bool {
|
||||||
return expSamples[i].T() < expSamples[j].T()
|
return expSamples[i].T() < expSamples[j].T()
|
||||||
|
|
|
@ -481,31 +481,12 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
|
||||||
return elem, true, offset == 0, nil
|
return elem, true, offset == 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// mergedChunks return an iterable over one or more OOO chunks for the given
|
// mergedChunks return an iterable over all chunks that overlap the
|
||||||
// chunks.Meta reference from memory or by m-mapping it from the disk. The
|
// time window [mint,maxt], plus meta.Chunk if populated.
|
||||||
// returned iterable will be a merge of all the overlapping chunks, if any,
|
|
||||||
// amongst all the chunks in the OOOHead.
|
|
||||||
// If hr is non-nil then in-order chunks are included.
|
// If hr is non-nil then in-order chunks are included.
|
||||||
// This function is not thread safe unless the caller holds a lock.
|
// This function is not thread safe unless the caller holds a lock.
|
||||||
// The caller must ensure that s.ooo is not nil.
|
// The caller must ensure that s.ooo is not nil.
|
||||||
func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) {
|
func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) {
|
||||||
_, cid, _ := unpackHeadChunkRef(meta.Ref)
|
|
||||||
|
|
||||||
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
|
|
||||||
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index.
|
|
||||||
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
|
|
||||||
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
|
|
||||||
ix := int(cid) - int(s.ooo.firstOOOChunkID)
|
|
||||||
if ix < 0 || ix > len(s.ooo.oooMmappedChunks) {
|
|
||||||
return nil, storage.ErrNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
if ix == len(s.ooo.oooMmappedChunks) {
|
|
||||||
if s.ooo.oooHeadChunk == nil {
|
|
||||||
return nil, errors.New("invalid ooo head chunk")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We create a temporary slice of chunk metas to hold the information of all
|
// We create a temporary slice of chunk metas to hold the information of all
|
||||||
// possible chunks that may overlap with the requested chunk.
|
// possible chunks that may overlap with the requested chunk.
|
||||||
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1)
|
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1)
|
||||||
|
|
|
@ -242,7 +242,7 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader,
|
||||||
|
|
||||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
||||||
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||||
if !isOOO {
|
if !isOOO && meta.Chunk == nil { // meta.Chunk can have a copy of OOO head samples, even on non-OOO chunk ID.
|
||||||
return cr.cr.ChunkOrIterable(meta)
|
return cr.cr.ChunkOrIterable(meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,6 +253,10 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
|
if s.ooo == nil { // Must have s.ooo non-nil to call mergedChunks().
|
||||||
|
s.Unlock()
|
||||||
|
return cr.cr.ChunkOrIterable(meta)
|
||||||
|
}
|
||||||
mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
|
mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue