mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Make requesting merge with OOO head explicit in chunk.Meta
Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
41c076196e
commit
183bbc39a2
|
@ -133,6 +133,9 @@ type Meta struct {
|
|||
// Time range the data covers.
|
||||
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
|
||||
MinTime, MaxTime int64
|
||||
|
||||
// Flag to indicate that this meta needs merge with OOO data.
|
||||
MergeOOO bool
|
||||
}
|
||||
|
||||
// ChunkFromSamples requires all samples to have the same type.
|
||||
|
|
|
@ -91,10 +91,11 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||
|
||||
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
|
||||
tmpChks = append(tmpChks, chunks.Meta{
|
||||
MinTime: minT,
|
||||
MaxTime: maxT,
|
||||
Ref: ref,
|
||||
Chunk: chunk,
|
||||
MinTime: minT,
|
||||
MaxTime: maxT,
|
||||
Ref: ref,
|
||||
Chunk: chunk,
|
||||
MergeOOO: true,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -160,6 +161,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||
if c.Chunk != nil {
|
||||
(*chks)[len(*chks)-1].Chunk = c.Chunk
|
||||
}
|
||||
(*chks)[len(*chks)-1].MergeOOO = (*chks)[len(*chks)-1].MergeOOO || c.MergeOOO
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -241,8 +243,8 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader,
|
|||
}
|
||||
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
||||
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
if !isOOO && meta.Chunk == nil { // meta.Chunk can have a copy of OOO head samples, even on non-OOO chunk ID.
|
||||
sid, _, _ := unpackHeadChunkRef(meta.Ref)
|
||||
if !meta.MergeOOO {
|
||||
return cr.cr.ChunkOrIterable(meta)
|
||||
}
|
||||
|
||||
|
@ -266,8 +268,7 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu
|
|||
// ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy
|
||||
// behaviour is only implemented for the in-order head chunk.
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||
_, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
if !isOOO {
|
||||
if !meta.MergeOOO {
|
||||
return cr.cr.ChunkOrIterableWithCopy(meta)
|
||||
}
|
||||
chk, iter, err := cr.ChunkOrIterable(meta)
|
||||
|
|
|
@ -308,9 +308,10 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
var expChunks []chunks.Meta
|
||||
for _, e := range tc.expChunks {
|
||||
meta := chunks.Meta{
|
||||
Chunk: chunkenc.Chunk(nil),
|
||||
MinTime: e.mint,
|
||||
MaxTime: e.maxt,
|
||||
Chunk: chunkenc.Chunk(nil),
|
||||
MinTime: e.mint,
|
||||
MaxTime: e.maxt,
|
||||
MergeOOO: true, // Only OOO chunks are tested here, so we always request merge from OOO head.
|
||||
}
|
||||
|
||||
// Ref to whatever Ref the chunk has, that we refer to by ID
|
||||
|
@ -484,7 +485,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
|
||||
defer cr.Close()
|
||||
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
|
||||
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
|
||||
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, MergeOOO: true,
|
||||
})
|
||||
require.Nil(t, iterable)
|
||||
require.Equal(t, err, fmt.Errorf("not found"))
|
||||
|
|
Loading…
Reference in a new issue