mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
[PERF] TSDB: Query head and ooo-head together
Add `HeadAndOOOQuerier` which iterates just once over series, then where necessary merges chunks from in-order and out-of-order lists. Add a ChunkQuerier for in-order and ooo together Add copy-last-chunk behaviour to HeadAndOOOChunkReader Out-of-order chunk IDs are distinguished from in-order by setting bit 23. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
2936ab80d7
commit
e04d137649
54
tsdb/db.go
54
tsdb/db.go
|
@ -2029,7 +2029,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockQueriers := make([]storage.Querier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers
|
blockQueriers := make([]storage.Querier, 0, len(blocks)+1) // +1 to allow for possible head querier.
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2041,10 +2041,11 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
var headQuerier storage.Querier
|
||||||
if maxt >= db.head.MinTime() {
|
if maxt >= db.head.MinTime() {
|
||||||
rh := NewRangeHead(db.head, mint, maxt)
|
rh := NewRangeHead(db.head, mint, maxt)
|
||||||
var err error
|
var err error
|
||||||
inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt)
|
headQuerier, err = db.blockQuerierFunc(rh, mint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("open block querier for head %s: %w", rh, err)
|
return nil, fmt.Errorf("open block querier for head %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
|
@ -2054,36 +2055,40 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
|
||||||
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
|
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
|
||||||
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
|
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
|
||||||
if shouldClose {
|
if shouldClose {
|
||||||
if err := inOrderHeadQuerier.Close(); err != nil {
|
if err := headQuerier.Close(); err != nil {
|
||||||
return nil, fmt.Errorf("closing head block querier %s: %w", rh, err)
|
return nil, fmt.Errorf("closing head block querier %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
inOrderHeadQuerier = nil
|
headQuerier = nil
|
||||||
}
|
}
|
||||||
if getNew {
|
if getNew {
|
||||||
rh := NewRangeHead(db.head, newMint, maxt)
|
rh := NewRangeHead(db.head, newMint, maxt)
|
||||||
inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt)
|
headQuerier, err = db.blockQuerierFunc(rh, newMint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
|
return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if inOrderHeadQuerier != nil {
|
|
||||||
blockQueriers = append(blockQueriers, inOrderHeadQuerier)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if headQuerier != nil {
|
||||||
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
|
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
|
||||||
|
// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
|
||||||
|
isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
|
||||||
|
headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier)
|
||||||
|
}
|
||||||
|
} else if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
|
||||||
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
|
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
|
||||||
var err error
|
var err error
|
||||||
outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt)
|
headQuerier, err = db.blockQuerierFunc(rh, mint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead.
|
// If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead.
|
||||||
rh.isoState.Close()
|
rh.isoState.Close()
|
||||||
|
|
||||||
return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err)
|
return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
blockQueriers = append(blockQueriers, outOfOrderHeadQuerier)
|
if headQuerier != nil {
|
||||||
|
blockQueriers = append(blockQueriers, headQuerier)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
|
@ -2111,7 +2116,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers
|
blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+1) // +1 to allow for possible head querier.
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2123,9 +2128,10 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
var headQuerier storage.ChunkQuerier
|
||||||
if maxt >= db.head.MinTime() {
|
if maxt >= db.head.MinTime() {
|
||||||
rh := NewRangeHead(db.head, mint, maxt)
|
rh := NewRangeHead(db.head, mint, maxt)
|
||||||
inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt)
|
headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("open querier for head %s: %w", rh, err)
|
return nil, fmt.Errorf("open querier for head %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
|
@ -2135,35 +2141,39 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
|
||||||
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
|
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
|
||||||
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
|
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
|
||||||
if shouldClose {
|
if shouldClose {
|
||||||
if err := inOrderHeadQuerier.Close(); err != nil {
|
if err := headQuerier.Close(); err != nil {
|
||||||
return nil, fmt.Errorf("closing head querier %s: %w", rh, err)
|
return nil, fmt.Errorf("closing head querier %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
inOrderHeadQuerier = nil
|
headQuerier = nil
|
||||||
}
|
}
|
||||||
if getNew {
|
if getNew {
|
||||||
rh := NewRangeHead(db.head, newMint, maxt)
|
rh := NewRangeHead(db.head, newMint, maxt)
|
||||||
inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt)
|
headQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
|
return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if inOrderHeadQuerier != nil {
|
|
||||||
blockQueriers = append(blockQueriers, inOrderHeadQuerier)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if headQuerier != nil {
|
||||||
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
|
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
|
||||||
|
// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
|
||||||
|
isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
|
||||||
|
headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier)
|
||||||
|
}
|
||||||
|
} else if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
|
||||||
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
|
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
|
||||||
outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt)
|
headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead.
|
// If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead.
|
||||||
rh.isoState.Close()
|
rh.isoState.Close()
|
||||||
|
|
||||||
return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err)
|
return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
blockQueriers = append(blockQueriers, outOfOrderHeadQuerier)
|
if headQuerier != nil {
|
||||||
|
blockQueriers = append(blockQueriers, headQuerier)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
|
|
|
@ -248,12 +248,20 @@ func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
|
||||||
return chunks.HeadChunkID(pos) + s.firstChunkID
|
return chunks.HeadChunkID(pos) + s.firstChunkID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const oooChunkIDMask = 1 << 23
|
||||||
|
|
||||||
// oooHeadChunkID returns the HeadChunkID referred to by the given position.
|
// oooHeadChunkID returns the HeadChunkID referred to by the given position.
|
||||||
|
// Only the bottom 24 bits are used. Bit 23 is always 1 for an OOO chunk; for the rest:
|
||||||
// * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos]
|
// * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos]
|
||||||
// * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk
|
// * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk
|
||||||
// The caller must ensure that s.ooo is not nil.
|
// The caller must ensure that s.ooo is not nil.
|
||||||
func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
|
func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
|
||||||
return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID
|
return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask
|
||||||
|
}
|
||||||
|
|
||||||
|
func unpackHeadChunkRef(ref chunks.ChunkRef) (chunks.HeadSeriesRef, chunks.HeadChunkID, bool) {
|
||||||
|
sid, cid := chunks.HeadChunkRef(ref).Unpack()
|
||||||
|
return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValueFor returns label value for the given label name in the series referred to by ID.
|
// LabelValueFor returns label value for the given label name in the series referred to by ID.
|
||||||
|
@ -343,10 +351,15 @@ func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chu
|
||||||
return chk, nil, err
|
return chk, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChunkWithCopy returns the chunk for the reference number.
|
type ChunkReaderWithCopy interface {
|
||||||
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk.
|
ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error)
|
||||||
func (h *headChunkReader) ChunkWithCopy(meta chunks.Meta) (chunkenc.Chunk, int64, error) {
|
}
|
||||||
return h.chunk(meta, true)
|
|
||||||
|
// ChunkOrIterableWithCopy returns the chunk for the reference number.
|
||||||
|
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk, plus the max time of the chunk.
|
||||||
|
func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||||
|
chk, maxTime, err := h.chunk(meta, true)
|
||||||
|
return chk, nil, maxTime, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// chunk returns the chunk for the reference number.
|
// chunk returns the chunk for the reference number.
|
||||||
|
@ -472,10 +485,11 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
|
||||||
// chunks.Meta reference from memory or by m-mapping it from the disk. The
|
// chunks.Meta reference from memory or by m-mapping it from the disk. The
|
||||||
// returned iterable will be a merge of all the overlapping chunks, if any,
|
// returned iterable will be a merge of all the overlapping chunks, if any,
|
||||||
// amongst all the chunks in the OOOHead.
|
// amongst all the chunks in the OOOHead.
|
||||||
|
// If hr is non-nil then in-order chunks are included.
|
||||||
// This function is not thread safe unless the caller holds a lock.
|
// This function is not thread safe unless the caller holds a lock.
|
||||||
// The caller must ensure that s.ooo is not nil.
|
// The caller must ensure that s.ooo is not nil.
|
||||||
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) {
|
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) {
|
||||||
_, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
|
_, cid, _ := unpackHeadChunkRef(meta.Ref)
|
||||||
|
|
||||||
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
|
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
|
||||||
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index.
|
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index.
|
||||||
|
@ -516,6 +530,17 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
|
||||||
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta})
|
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if hr != nil { // Include in-order chunks.
|
||||||
|
var metas []chunks.Meta
|
||||||
|
getSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), &metas)
|
||||||
|
for _, m := range metas {
|
||||||
|
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
|
||||||
|
meta: m,
|
||||||
|
ref: 0, // This tells the loop below it's an in-order head chunk.
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Next we want to sort all the collected chunks by min time so we can find
|
// Next we want to sort all the collected chunks by min time so we can find
|
||||||
// those that overlap and stop when we know the rest don't.
|
// those that overlap and stop when we know the rest don't.
|
||||||
slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef)
|
slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef)
|
||||||
|
@ -527,9 +552,17 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var iterable chunkenc.Iterable
|
var iterable chunkenc.Iterable
|
||||||
if c.meta.Chunk != nil {
|
switch {
|
||||||
|
case c.meta.Chunk != nil:
|
||||||
iterable = c.meta.Chunk
|
iterable = c.meta.Chunk
|
||||||
} else {
|
case c.ref == 0: // This is an in-order head chunk.
|
||||||
|
_, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack()
|
||||||
|
var err error
|
||||||
|
iterable, _, err = hr.chunkFromSeries(s, cid, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid head chunk: %w", err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
chk, err := cdm.Chunk(c.ref)
|
chk, err := cdm.Chunk(c.ref)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var cerr *chunks.CorruptionErr
|
var cerr *chunks.CorruptionErr
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/prometheus/tsdb/index"
|
"github.com/prometheus/prometheus/tsdb/index"
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||||
|
"github.com/prometheus/prometheus/util/annotations"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ IndexReader = &OOOHeadIndexReader{}
|
var _ IndexReader = &OOOHeadIndexReader{}
|
||||||
|
@ -92,10 +93,10 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return getOOOSeriesChunks(s, oh.mint, oh.maxt, lastGarbageCollectedMmapRef, maxMmapRef, chks)
|
return getOOOSeriesChunks(s, oh.mint, oh.maxt, lastGarbageCollectedMmapRef, maxMmapRef, false, chks)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, chks *[]chunks.Meta) error {
|
func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error {
|
||||||
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
|
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
|
||||||
|
|
||||||
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
|
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
|
||||||
|
@ -135,6 +136,10 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if includeInOrder {
|
||||||
|
getSeriesChunks(s, mint, maxt, &tmpChks)
|
||||||
|
}
|
||||||
|
|
||||||
// There is nothing to do if we did not collect any chunk.
|
// There is nothing to do if we did not collect any chunk.
|
||||||
if len(tmpChks) == 0 {
|
if len(tmpChks) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -275,7 +280,7 @@ func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk,
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
return nil, nil, storage.ErrNotFound
|
return nil, nil, storage.ErrNotFound
|
||||||
}
|
}
|
||||||
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt, cr.maxMmapRef)
|
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, nil, cr.mint, cr.maxt, cr.maxMmapRef)
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -498,3 +503,174 @@ func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, posti
|
||||||
func (ir *OOOCompactionHeadIndexReader) Close() error {
|
func (ir *OOOCompactionHeadIndexReader) Close() error {
|
||||||
return ir.ch.oooIR.Close()
|
return ir.ch.oooIR.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HeadAndOOOQuerier queries both the head and the out-of-order head.
|
||||||
|
type HeadAndOOOQuerier struct {
|
||||||
|
mint, maxt int64
|
||||||
|
head *Head
|
||||||
|
index IndexReader
|
||||||
|
chunkr ChunkReader
|
||||||
|
querier storage.Querier
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
|
||||||
|
isoState := head.iso.State(mint, maxt)
|
||||||
|
return &HeadAndOOOQuerier{
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
head: head,
|
||||||
|
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef),
|
||||||
|
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, isoState, oooIsoState, 0),
|
||||||
|
querier: querier,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
return q.querier.LabelValues(ctx, name, hints, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
return q.querier.LabelNames(ctx, hints, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOQuerier) Close() error {
|
||||||
|
q.chunkr.Close()
|
||||||
|
return q.querier.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||||
|
return selectSeriesSet(ctx, sortSeries, hints, matchers, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeadAndOOOChunkQuerier queries both the head and the out-of-order head.
|
||||||
|
type HeadAndOOOChunkQuerier struct {
|
||||||
|
mint, maxt int64
|
||||||
|
head *Head
|
||||||
|
index IndexReader
|
||||||
|
chunkr ChunkReader
|
||||||
|
querier storage.ChunkQuerier
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier {
|
||||||
|
isoState := head.iso.State(mint, maxt)
|
||||||
|
return &HeadAndOOOChunkQuerier{
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
head: head,
|
||||||
|
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef),
|
||||||
|
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, isoState, oooIsoState, 0),
|
||||||
|
querier: querier,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
return q.querier.LabelValues(ctx, name, hints, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
return q.querier.LabelNames(ctx, hints, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOChunkQuerier) Close() error {
|
||||||
|
q.chunkr.Close()
|
||||||
|
return q.querier.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||||
|
return selectChunkSeriesSet(ctx, sortSeries, hints, matchers, rangeHeadULID, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt)
|
||||||
|
}
|
||||||
|
|
||||||
|
type HeadAndOOOIndexReader struct {
|
||||||
|
*headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
|
||||||
|
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
|
||||||
|
hr := &headIndexReader{
|
||||||
|
head: head,
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
}
|
||||||
|
return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||||
|
s := oh.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||||
|
if s == nil {
|
||||||
|
oh.head.metrics.seriesNotFound.Inc()
|
||||||
|
return storage.ErrNotFound
|
||||||
|
}
|
||||||
|
builder.Assign(s.lset)
|
||||||
|
|
||||||
|
if chks == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
*chks = (*chks)[:0]
|
||||||
|
|
||||||
|
if s.ooo != nil {
|
||||||
|
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks)
|
||||||
|
}
|
||||||
|
getSeriesChunks(s, oh.mint, oh.maxt, chks)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type HeadAndOOOChunkReader struct {
|
||||||
|
cr headChunkReader
|
||||||
|
maxMmapRef chunks.ChunkDiskMapperRef
|
||||||
|
oooIsoState *oooIsolationState
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, isoState *isolationState, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader {
|
||||||
|
return &HeadAndOOOChunkReader{
|
||||||
|
cr: headChunkReader{
|
||||||
|
head: head,
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
isoState: isoState,
|
||||||
|
},
|
||||||
|
maxMmapRef: maxMmapRef,
|
||||||
|
oooIsoState: oooIsoState,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
||||||
|
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||||
|
if !isOOO {
|
||||||
|
return cr.cr.ChunkOrIterable(meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
s := cr.cr.head.series.getByID(sid)
|
||||||
|
// This means that the series has been garbage collected.
|
||||||
|
if s == nil {
|
||||||
|
return nil, nil, storage.ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Lock()
|
||||||
|
mc, err := s.oooMergedChunks(meta, cr.cr.head.chunkDiskMapper, &cr.cr, cr.cr.mint, cr.cr.maxt, cr.maxMmapRef)
|
||||||
|
s.Unlock()
|
||||||
|
|
||||||
|
return nil, mc, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pass through special behaviour for current head chunk.
|
||||||
|
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||||
|
_, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||||
|
if !isOOO {
|
||||||
|
return cr.cr.ChunkOrIterableWithCopy(meta)
|
||||||
|
}
|
||||||
|
chk, iter, err := cr.ChunkOrIterable(meta)
|
||||||
|
return chk, iter, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *HeadAndOOOChunkReader) Close() error {
|
||||||
|
if cr.cr.isoState != nil {
|
||||||
|
cr.cr.isoState.Close()
|
||||||
|
}
|
||||||
|
if cr.oooIsoState != nil {
|
||||||
|
cr.oooIsoState.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -316,7 +316,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
||||||
// Ref to whatever Ref the chunk has, that we refer to by ID
|
// Ref to whatever Ref the chunk has, that we refer to by ID
|
||||||
for ref, c := range intervals {
|
for ref, c := range intervals {
|
||||||
if c.ID == e.ID {
|
if c.ID == e.ID {
|
||||||
meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), chunks.HeadChunkID(ref)))
|
meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref)))
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -641,14 +641,16 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
hcr, ok := p.cr.(*headChunkReader)
|
hcr, ok := p.cr.(ChunkReaderWithCopy)
|
||||||
var iterable chunkenc.Iterable
|
var iterable chunkenc.Iterable
|
||||||
if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 {
|
if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 {
|
||||||
// ChunkWithCopy will copy the head chunk.
|
// ChunkOrIterableWithCopy will copy the head chunk, if it can.
|
||||||
var maxt int64
|
var maxt int64
|
||||||
p.currMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currMeta)
|
p.currMeta.Chunk, iterable, maxt, p.err = hcr.ChunkOrIterableWithCopy(p.currMeta)
|
||||||
|
if p.currMeta.Chunk != nil {
|
||||||
// For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here.
|
// For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here.
|
||||||
p.currMeta.MaxTime = maxt
|
p.currMeta.MaxTime = maxt
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta)
|
p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue