Merge pull request #14354 from bboreham/ooo-head-together
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

[PERF] TSDB: Query head and ooo-head together

The current implementation of out-of-order querying runs two queriers which each iterate all series in the head, then runs a merge operation on the output.

This PR adds HeadAndOOOQuerier which iterates just once over series, then where necessary merges chunks from in-order and out-of-order lists.

In order to distinguish in-order from out-of-order chunk references I set bit 23 (i.e. 1<<23) on ooo references; this reduces the maximum number of chunks from 16 million to 8 million.

Note one side-effect of this change is that results may come in a different order - the merge operation done previously required a sort of all series. This only changes where Prometheus does not guarantee the order.

Fixes #11628
This commit is contained in:
Bryan Boreham 2024-08-14 14:39:30 +01:00 committed by GitHub
commit d1ea6eb35d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 419 additions and 312 deletions

View file

@ -20,6 +20,7 @@ import (
"math"
"os"
"runtime"
"slices"
"strings"
"testing"
"time"
@ -152,12 +153,18 @@ func TestTSDBDump(t *testing.T) {
expectedMetrics, err := os.ReadFile(tt.expectedDump)
require.NoError(t, err)
expectedMetrics = normalizeNewLine(expectedMetrics)
// even though in case of one matcher samples are not sorted, the order in the cases above should stay the same.
require.Equal(t, string(expectedMetrics), dumpedMetrics)
// Sort both, because Prometheus does not guarantee the output order.
require.Equal(t, sortLines(string(expectedMetrics)), sortLines(dumpedMetrics))
})
}
}
func sortLines(buf string) string {
lines := strings.Split(buf, "\n")
slices.Sort(lines)
return strings.Join(lines, "\n")
}
func TestTSDBDumpOpenMetrics(t *testing.T) {
storage := promqltest.LoadedStorage(t, `
load 1m
@ -169,7 +176,7 @@ func TestTSDBDumpOpenMetrics(t *testing.T) {
require.NoError(t, err)
expectedMetrics = normalizeNewLine(expectedMetrics)
dumpedMetrics := getDumpedSamples(t, storage.Dir(), math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics)
require.Equal(t, string(expectedMetrics), dumpedMetrics)
require.Equal(t, sortLines(string(expectedMetrics)), sortLines(dumpedMetrics))
}
func TestTSDBDumpOpenMetricsRoundTrip(t *testing.T) {

View file

@ -2029,7 +2029,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
}
}
blockQueriers := make([]storage.Querier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers
blockQueriers := make([]storage.Querier, 0, len(blocks)+1) // +1 to allow for possible head querier.
defer func() {
if err != nil {
@ -2041,10 +2041,12 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
}
}()
if maxt >= db.head.MinTime() {
overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime())
var headQuerier storage.Querier
if maxt >= db.head.MinTime() || overlapsOOO {
rh := NewRangeHead(db.head, mint, maxt)
var err error
inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt)
headQuerier, err = db.blockQuerierFunc(rh, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open block querier for head %s: %w", rh, err)
}
@ -2054,36 +2056,28 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
if shouldClose {
if err := inOrderHeadQuerier.Close(); err != nil {
if err := headQuerier.Close(); err != nil {
return nil, fmt.Errorf("closing head block querier %s: %w", rh, err)
}
inOrderHeadQuerier = nil
headQuerier = nil
}
if getNew {
rh := NewRangeHead(db.head, newMint, maxt)
inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt)
headQuerier, err = db.blockQuerierFunc(rh, newMint, maxt)
if err != nil {
return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
}
}
if inOrderHeadQuerier != nil {
blockQueriers = append(blockQueriers, inOrderHeadQuerier)
}
}
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
var err error
outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt)
if err != nil {
// If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead.
rh.isoState.Close()
if overlapsOOO {
// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier)
}
return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err)
}
blockQueriers = append(blockQueriers, outOfOrderHeadQuerier)
if headQuerier != nil {
blockQueriers = append(blockQueriers, headQuerier)
}
for _, b := range blocks {
@ -2111,7 +2105,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
}
}
blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers
blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+1) // +1 to allow for possible head querier.
defer func() {
if err != nil {
@ -2123,9 +2117,11 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
}
}()
if maxt >= db.head.MinTime() {
overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime())
var headQuerier storage.ChunkQuerier
if maxt >= db.head.MinTime() || overlapsOOO {
rh := NewRangeHead(db.head, mint, maxt)
inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt)
headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open querier for head %s: %w", rh, err)
}
@ -2135,35 +2131,28 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
if shouldClose {
if err := inOrderHeadQuerier.Close(); err != nil {
if err := headQuerier.Close(); err != nil {
return nil, fmt.Errorf("closing head querier %s: %w", rh, err)
}
inOrderHeadQuerier = nil
headQuerier = nil
}
if getNew {
rh := NewRangeHead(db.head, newMint, maxt)
inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt)
headQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt)
if err != nil {
return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
}
}
if inOrderHeadQuerier != nil {
blockQueriers = append(blockQueriers, inOrderHeadQuerier)
}
}
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt)
if err != nil {
// If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead.
rh.isoState.Close()
if overlapsOOO {
// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier)
}
return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err)
}
blockQueriers = append(blockQueriers, outOfOrderHeadQuerier)
if headQuerier != nil {
blockQueriers = append(blockQueriers, headQuerier)
}
for _, b := range blocks {

View file

@ -19,6 +19,7 @@ import (
"fmt"
"math"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/exemplar"
@ -936,7 +937,7 @@ func (a *headAppender) Commit() (err error) {
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax)
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != nil {
@ -1083,14 +1084,14 @@ func (a *headAppender) Commit() (err error) {
}
// insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger log.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
if s.ooo == nil {
s.ooo = &memSeriesOOOFields{}
}
c := s.ooo.oooHeadChunk
if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper)
c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper, logger)
chunkCreated = true
}
@ -1444,9 +1445,9 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
}
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper)
// The caller must ensure that s is locked and s.ooo is not nil.
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger log.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper, logger)
s.ooo.oooHeadChunk = &oooHeadChunk{
chunk: NewOOOChunk(),
@ -1457,7 +1458,8 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk
return s.ooo.oooHeadChunk, ref
}
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) []chunks.ChunkDiskMapperRef {
// s must be locked when calling.
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper, logger log.Logger) []chunks.ChunkDiskMapperRef {
if s.ooo == nil || s.ooo.oooHeadChunk == nil {
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
return nil
@ -1469,6 +1471,10 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap
}
chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, 1)
for _, memchunk := range chks {
if len(s.ooo.oooMmappedChunks) >= (oooChunkIDMask - 1) {
level.Error(logger).Log("msg", "Too many OOO chunks, dropping data", "series", s.lset.String())
break
}
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, memchunk.chunk, true, handleChunkWriteError)
chunkRefs = append(chunkRefs, chunkRef)
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{

View file

@ -199,13 +199,18 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
defer s.Unlock()
*chks = (*chks)[:0]
*chks = appendSeriesChunks(s, h.mint, h.maxt, *chks)
return nil
}
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta {
for i, c := range s.mmappedChunks {
// Do not expose chunks that are outside of the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
if !c.OverlapsClosedInterval(mint, maxt) {
continue
}
*chks = append(*chks, chunks.Meta{
chks = append(chks, chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
@ -223,8 +228,8 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
} else {
maxTime = chk.maxTime
}
if chk.OverlapsClosedInterval(h.mint, h.maxt) {
*chks = append(*chks, chunks.Meta{
if chk.OverlapsClosedInterval(mint, maxt) {
chks = append(chks, chunks.Meta{
MinTime: chk.minTime,
MaxTime: maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))),
@ -233,8 +238,7 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
j++
}
}
return nil
return chks
}
// headChunkID returns the HeadChunkID referred to by the given position.
@ -244,12 +248,20 @@ func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
return chunks.HeadChunkID(pos) + s.firstChunkID
}
const oooChunkIDMask = 1 << 23
// oooHeadChunkID returns the HeadChunkID referred to by the given position.
// Only the bottom 24 bits are used. Bit 23 is always 1 for an OOO chunk; for the rest:
// * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos]
// * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID
return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask
}
func unpackHeadChunkRef(ref chunks.ChunkRef) (seriesID chunks.HeadSeriesRef, chunkID chunks.HeadChunkID, isOOO bool) {
sid, cid := chunks.HeadChunkRef(ref).Unpack()
return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0
}
// LabelValueFor returns label value for the given label name in the series referred to by ID.
@ -339,10 +351,15 @@ func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chu
return chk, nil, err
}
// ChunkWithCopy returns the chunk for the reference number.
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk.
func (h *headChunkReader) ChunkWithCopy(meta chunks.Meta) (chunkenc.Chunk, int64, error) {
return h.chunk(meta, true)
type ChunkReaderWithCopy interface {
ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error)
}
// ChunkOrIterableWithCopy returns the chunk for the reference number.
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk, plus the max time of the chunk.
func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
chk, maxTime, err := h.chunk(meta, true)
return chk, nil, maxTime, err
}
// chunk returns the chunk for the reference number.
@ -358,9 +375,14 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
}
s.Lock()
defer s.Unlock()
return h.chunkFromSeries(s, cid, copyLastChunk)
}
// Call with s locked.
func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool)
if err != nil {
s.Unlock()
return nil, 0, err
}
defer func() {
@ -374,7 +396,6 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
// This means that the chunk is outside the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock()
return nil, 0, storage.ErrNotFound
}
@ -391,7 +412,6 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
return nil, 0, err
}
}
s.Unlock()
return &safeHeadChunk{
Chunk: chk,
@ -461,14 +481,15 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
return elem, true, offset == 0, nil
}
// oooMergedChunks return an iterable over one or more OOO chunks for the given
// mergedChunks return an iterable over one or more OOO chunks for the given
// chunks.Meta reference from memory or by m-mapping it from the disk. The
// returned iterable will be a merge of all the overlapping chunks, if any,
// amongst all the chunks in the OOOHead.
// If hr is non-nil then in-order chunks are included.
// This function is not thread safe unless the caller holds a lock.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) {
_, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) {
_, cid, _ := unpackHeadChunkRef(meta.Ref)
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index.
@ -509,6 +530,16 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta})
}
if hr != nil { // Include in-order chunks.
metas := appendSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), nil)
for _, m := range metas {
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: m,
ref: 0, // This tells the loop below it's an in-order head chunk.
})
}
}
// Next we want to sort all the collected chunks by min time so we can find
// those that overlap and stop when we know the rest don't.
slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef)
@ -520,9 +551,17 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
continue
}
var iterable chunkenc.Iterable
if c.meta.Chunk != nil {
switch {
case c.meta.Chunk != nil:
iterable = c.meta.Chunk
} else {
case c.ref == 0: // This is an in-order head chunk.
_, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack()
var err error
iterable, _, err = hr.chunkFromSeries(s, cid, false)
if err != nil {
return nil, fmt.Errorf("invalid head chunk: %w", err)
}
default:
chk, err := cdm.Chunk(c.ref)
if err != nil {
var cerr *chunks.CorruptionErr

View file

@ -890,7 +890,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
unknownRefs++
continue
}
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax)
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()

View file

@ -14,16 +14,10 @@
package tsdb
import (
"fmt"
"sort"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tombstones"
)
// OOOChunk maintains samples in time-ascending order.
@ -171,75 +165,3 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
}
return chks, nil
}
var _ BlockReader = &OOORangeHead{}
// OOORangeHead allows querying Head out of order samples via BlockReader
// interface implementation.
type OOORangeHead struct {
head *Head
// mint and maxt are tracked because when a query is handled we only want
// the timerange of the query and having preexisting pointers to the first
// and last timestamp help with that.
mint, maxt int64
isoState *oooIsolationState
}
func NewOOORangeHead(head *Head, mint, maxt int64, minRef chunks.ChunkDiskMapperRef) *OOORangeHead {
isoState := head.oooIso.TrackReadAfter(minRef)
return &OOORangeHead{
head: head,
mint: mint,
maxt: maxt,
isoState: isoState,
}
}
func (oh *OOORangeHead) Index() (IndexReader, error) {
return NewOOOHeadIndexReader(oh.head, oh.mint, oh.maxt, oh.isoState.minRef), nil
}
func (oh *OOORangeHead) Chunks() (ChunkReader, error) {
return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState, 0), nil
}
func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) {
// As stated in the design doc https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing
// Tombstones are not supported for out of order metrics.
return tombstones.NewMemTombstones(), nil
}
var oooRangeHeadULID = ulid.MustParse("0000000000XXXX000RANGEHEAD")
func (oh *OOORangeHead) Meta() BlockMeta {
return BlockMeta{
MinTime: oh.mint,
MaxTime: oh.maxt,
ULID: oooRangeHeadULID,
Stats: BlockStats{
NumSeries: oh.head.NumSeries(),
},
}
}
// Size returns the size taken by the Head block.
func (oh *OOORangeHead) Size() int64 {
return oh.head.Size()
}
// String returns an human readable representation of the out of order range
// head. It's important to keep this function in order to avoid the struct dump
// when the head is stringified in errors or logs.
func (oh *OOORangeHead) String() string {
return fmt.Sprintf("ooo range head (mint: %d, maxt: %d)", oh.MinTime(), oh.MaxTime())
}
func (oh *OOORangeHead) MinTime() int64 {
return oh.mint
}
func (oh *OOORangeHead) MaxTime() int64 {
return oh.maxt
}

View file

@ -27,17 +27,12 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/util/annotations"
)
var _ IndexReader = &OOOHeadIndexReader{}
var _ IndexReader = &HeadAndOOOIndexReader{}
// OOOHeadIndexReader implements IndexReader so ooo samples in the head can be
// accessed.
// It also has a reference to headIndexReader so we can leverage on its
// IndexReader implementation for all the methods that remain the same. We
// decided to do this to avoid code duplication.
// The only methods that change are the ones about getting Series and Postings.
type OOOHeadIndexReader struct {
type HeadAndOOOIndexReader struct {
*headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
}
@ -53,25 +48,16 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator
return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables)
}
func NewOOOHeadIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *OOOHeadIndexReader {
func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
hr := &headIndexReader{
head: head,
mint: mint,
maxt: maxt,
}
return &OOOHeadIndexReader{hr, lastGarbageCollectedMmapRef}
return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef}
}
func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
return oh.series(ref, builder, chks, oh.lastGarbageCollectedMmapRef, 0)
}
// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so
// any chunk at or before this ref will not be considered. 0 disables this check.
//
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
// the oooHeadChunk will not be considered.
func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef) error {
func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
s := oh.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
@ -88,10 +74,19 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
defer s.Unlock()
*chks = (*chks)[:0]
if s.ooo == nil {
return nil
if s.ooo != nil {
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks)
}
*chks = appendSeriesChunks(s, oh.mint, oh.maxt, *chks)
return nil
}
// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so
// any chunk at or before this ref will not be considered. 0 disables this check.
//
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
// the oooHeadChunk will not be considered.
func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error {
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
@ -106,7 +101,7 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
// Collect all chunks that overlap the query range.
if s.ooo.oooHeadChunk != nil {
c := s.ooo.oooHeadChunk
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 {
if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime)
@ -125,12 +120,16 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
}
for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
c := s.ooo.oooMmappedChunks[i]
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) {
if c.OverlapsClosedInterval(mint, maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
addChunk(c.minTime, c.maxTime, ref, nil)
}
}
if includeInOrder {
tmpChks = appendSeriesChunks(s, mint, maxt, tmpChks)
}
// There is nothing to do if we did not collect any chunk.
if len(tmpChks) == 0 {
return nil
@ -167,11 +166,10 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
return nil
}
// LabelValues needs to be overridden from the headIndexReader implementation due
// to the check that happens at the beginning where we make sure that the query
// interval overlaps with the head minooot and maxooot.
func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() {
// LabelValues needs to be overridden from the headIndexReader implementation
// so we can return labels within either in-order range or ooo range.
func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
if oh.maxt < oh.head.MinTime() && oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxTime() && oh.mint > oh.head.MaxOOOTime() {
return []string{}, nil
}
@ -223,41 +221,30 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) int {
}
}
func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
switch len(values) {
case 0:
return index.EmptyPostings(), nil
case 1:
return oh.head.postings.Get(name, values[0]), nil // TODO(ganesh) Also call GetOOOPostings
default:
// TODO(ganesh) We want to only return postings for out of order series.
res := make([]index.Postings, 0, len(values))
for _, value := range values {
res = append(res, oh.head.postings.Get(name, value)) // TODO(ganesh) Also call GetOOOPostings
}
return index.Merge(ctx, res...), nil
type HeadAndOOOChunkReader struct {
head *Head
mint, maxt int64
cr *headChunkReader // If nil, only read OOO chunks.
maxMmapRef chunks.ChunkDiskMapperRef
oooIsoState *oooIsolationState
}
func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader {
return &HeadAndOOOChunkReader{
head: head,
mint: mint,
maxt: maxt,
cr: cr,
maxMmapRef: maxMmapRef,
oooIsoState: oooIsoState,
}
}
type OOOHeadChunkReader struct {
head *Head
mint, maxt int64
isoState *oooIsolationState
maxMmapRef chunks.ChunkDiskMapperRef
}
func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *OOOHeadChunkReader {
return &OOOHeadChunkReader{
head: head,
mint: mint,
maxt: maxt,
isoState: isoState,
maxMmapRef: maxMmapRef,
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
if !isOOO {
return cr.cr.ChunkOrIterable(meta)
}
}
func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
sid, _ := chunks.HeadChunkRef(meta.Ref).Unpack()
s := cr.head.series.getByID(sid)
// This means that the series has been garbage collected.
@ -266,34 +253,35 @@ func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk,
}
s.Lock()
if s.ooo == nil {
// There is no OOO data for this series.
s.Unlock()
return nil, nil, storage.ErrNotFound
}
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt, cr.maxMmapRef)
mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
s.Unlock()
if err != nil {
return nil, nil, err
}
// This means that the query range did not overlap with the requested chunk.
if len(mc.chunkIterables) == 0 {
return nil, nil, storage.ErrNotFound
}
return nil, mc, nil
return nil, mc, err
}
func (cr OOOHeadChunkReader) Close() error {
if cr.isoState != nil {
cr.isoState.Close()
// ChunkOrIterableWithCopy: implements ChunkReaderWithCopy. The special Copy behaviour
// is only implemented for the in-order head chunk.
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
_, _, isOOO := unpackHeadChunkRef(meta.Ref)
if !isOOO {
return cr.cr.ChunkOrIterableWithCopy(meta)
}
chk, iter, err := cr.ChunkOrIterable(meta)
return chk, iter, 0, err
}
func (cr *HeadAndOOOChunkReader) Close() error {
if cr.cr != nil && cr.cr.isoState != nil {
cr.cr.isoState.Close()
}
if cr.oooIsoState != nil {
cr.oooIsoState.Close()
}
return nil
}
type OOOCompactionHead struct {
oooIR *OOOHeadIndexReader
head *Head
lastMmapRef chunks.ChunkDiskMapperRef
lastWBLFile int
postings []storage.SeriesRef
@ -310,6 +298,7 @@ type OOOCompactionHead struct {
// on the sample append latency. So call NewOOOCompactionHead only right before compaction.
func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error) {
ch := &OOOCompactionHead{
head: head,
chunkRange: head.chunkRange.Load(),
mint: math.MaxInt64,
maxt: math.MinInt64,
@ -323,15 +312,14 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
ch.lastWBLFile = lastWBLFile
}
ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64, 0)
hr := headIndexReader{head: head, mint: ch.mint, maxt: ch.maxt}
n, v := index.AllPostingsKey()
// TODO: verify this gets only ooo samples.
p, err := ch.oooIR.Postings(ctx, n, v)
// TODO: filter to series with OOO samples, before sorting.
p, err := hr.Postings(ctx, n, v)
if err != nil {
return nil, err
}
p = ch.oooIR.SortedPostings(p)
p = hr.SortedPostings(p)
var lastSeq, lastOff int
for p.Next() {
@ -352,7 +340,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
}
var lastMmapRef chunks.ChunkDiskMapperRef
mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper)
mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper, head.logger)
if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
@ -388,7 +376,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) {
}
func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) {
return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, ch.lastMmapRef), nil
return NewHeadAndOOOChunkReader(ch.head, ch.mint, ch.maxt, nil, nil, ch.lastMmapRef), nil
}
func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) {
@ -414,12 +402,12 @@ func (ch *OOOCompactionHead) Meta() BlockMeta {
// Only the method of BlockReader interface are valid for the cloned OOOCompactionHead.
func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead {
return &OOOCompactionHead{
oooIR: NewOOOHeadIndexReader(ch.oooIR.head, mint, maxt, 0),
head: ch.head,
lastMmapRef: ch.lastMmapRef,
postings: ch.postings,
chunkRange: ch.chunkRange,
mint: ch.mint,
maxt: ch.maxt,
mint: mint,
maxt: maxt,
}
}
@ -439,7 +427,8 @@ func NewOOOCompactionHeadIndexReader(ch *OOOCompactionHead) IndexReader {
}
func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter {
return ir.ch.oooIR.Symbols()
hr := headIndexReader{head: ir.ch.head, mint: ir.ch.mint, maxt: ir.ch.maxt}
return hr.Symbols()
}
func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error) {
@ -460,11 +449,28 @@ func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.P
}
func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
return ir.ch.oooIR.ShardedPostings(p, shardIndex, shardCount)
hr := headIndexReader{head: ir.ch.head, mint: ir.ch.mint, maxt: ir.ch.maxt}
return hr.ShardedPostings(p, shardIndex, shardCount)
}
func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef)
s := ir.ch.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
ir.ch.head.metrics.seriesNotFound.Inc()
return storage.ErrNotFound
}
builder.Assign(s.labels())
s.Lock()
defer s.Unlock()
*chks = (*chks)[:0]
if s.ooo == nil {
return nil
}
return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, chks)
}
func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
@ -492,5 +498,91 @@ func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, posti
}
func (ir *OOOCompactionHeadIndexReader) Close() error {
return ir.ch.oooIR.Close()
return nil
}
// HeadAndOOOQuerier queries both the head and the out-of-order head.
type HeadAndOOOQuerier struct {
mint, maxt int64
head *Head
index IndexReader
chunkr ChunkReader
querier storage.Querier
}
func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
cr := &headChunkReader{
head: head,
mint: mint,
maxt: maxt,
isoState: head.iso.State(mint, maxt),
}
return &HeadAndOOOQuerier{
mint: mint,
maxt: maxt,
head: head,
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef),
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
querier: querier,
}
}
func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return q.querier.LabelValues(ctx, name, hints, matchers...)
}
func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return q.querier.LabelNames(ctx, hints, matchers...)
}
func (q *HeadAndOOOQuerier) Close() error {
q.chunkr.Close()
return q.querier.Close()
}
func (q *HeadAndOOOQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
return selectSeriesSet(ctx, sortSeries, hints, matchers, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt)
}
// HeadAndOOOChunkQuerier queries both the head and the out-of-order head.
type HeadAndOOOChunkQuerier struct {
mint, maxt int64
head *Head
index IndexReader
chunkr ChunkReader
querier storage.ChunkQuerier
}
func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier {
cr := &headChunkReader{
head: head,
mint: mint,
maxt: maxt,
isoState: head.iso.State(mint, maxt),
}
return &HeadAndOOOChunkQuerier{
mint: mint,
maxt: maxt,
head: head,
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef),
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
querier: querier,
}
}
func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return q.querier.LabelValues(ctx, name, hints, matchers...)
}
func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return q.querier.LabelNames(ctx, hints, matchers...)
}
func (q *HeadAndOOOChunkQuerier) Close() error {
q.chunkr.Close()
return q.querier.Close()
}
func (q *HeadAndOOOChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
return selectChunkSeriesSet(ctx, sortSeries, hints, matchers, rangeHeadULID, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt)
}

View file

@ -316,7 +316,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
// Ref to whatever Ref the chunk has, that we refer to by ID
for ref, c := range intervals {
if c.ID == e.ID {
meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), chunks.HeadChunkID(ref)))
meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref)))
break
}
}
@ -341,7 +341,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
})
}
ir := NewOOOHeadIndexReader(h, tc.queryMinT, tc.queryMaxT, 0)
ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
@ -421,17 +421,17 @@ func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenari
name: "LabelValues calls with ooo head query range not overlapping out-of-order data",
queryMinT: 100,
queryMaxT: 100,
expValues1: []string{},
expValues2: []string{},
expValues3: []string{},
expValues4: []string{},
expValues1: []string{"bar1"},
expValues2: nil,
expValues3: []string{"bar1", "bar2"},
expValues4: []string{"bar1", "bar2"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// We first want to test using a head index reader that covers the biggest query interval
oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT, 0)
oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMaxT, 0)
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
values, err := oh.LabelValues(ctx, "foo", matchers...)
sort.Strings(values)
@ -481,10 +481,10 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil, 0)
cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
defer cr.Close()
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
})
require.Nil(t, iterable)
require.Equal(t, err, fmt.Errorf("not found"))
@ -832,14 +832,14 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
// The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err = ir.Series(s1Ref, &b, &chks)
require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks))
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0)
cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i])
@ -997,7 +997,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
// The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err = ir.Series(s1Ref, &b, &chks)
@ -1013,7 +1013,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
}
require.NoError(t, app.Commit())
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0)
cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i])

View file

@ -115,20 +115,24 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
}
func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
mint := q.mint
maxt := q.maxt
return selectSeriesSet(ctx, sortSeries, hints, ms, q.index, q.chunks, q.tombstones, q.mint, q.maxt)
}
func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher,
index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
) storage.SeriesSet {
disableTrimming := false
sharded := hints != nil && hints.ShardCount > 0
p, err := PostingsForMatchers(ctx, q.index, ms...)
p, err := PostingsForMatchers(ctx, index, ms...)
if err != nil {
return storage.ErrSeriesSet(err)
}
if sharded {
p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
}
if sortSeries {
p = q.index.SortedPostings(p)
p = index.SortedPostings(p)
}
if hints != nil {
@ -137,11 +141,11 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
disableTrimming = hints.DisableTrimming
if hints.Func == "series" {
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming)
return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
}
}
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming)
}
// blockChunkQuerier provides chunk querying access to a single block database.
@ -159,8 +163,12 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier
}
func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
mint := q.mint
maxt := q.maxt
return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt)
}
func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher,
blockID ulid.ULID, index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
) storage.ChunkSeriesSet {
disableTrimming := false
sharded := hints != nil && hints.ShardCount > 0
@ -169,17 +177,17 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *
maxt = hints.End
disableTrimming = hints.DisableTrimming
}
p, err := PostingsForMatchers(ctx, q.index, ms...)
p, err := PostingsForMatchers(ctx, index, ms...)
if err != nil {
return storage.ErrChunkSeriesSet(err)
}
if sharded {
p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
}
if sortSeries {
p = q.index.SortedPostings(p)
p = index.SortedPostings(p)
}
return NewBlockChunkSeriesSet(q.blockID, q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
return NewBlockChunkSeriesSet(blockID, index, chunks, tombstones, p, mint, maxt, disableTrimming)
}
// PostingsForMatchers assembles a single postings iterator against the index reader
@ -633,14 +641,16 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool {
}
}
hcr, ok := p.cr.(*headChunkReader)
hcr, ok := p.cr.(ChunkReaderWithCopy)
var iterable chunkenc.Iterable
if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 {
// ChunkWithCopy will copy the head chunk.
// ChunkOrIterableWithCopy will copy the head chunk, if it can.
var maxt int64
p.currMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currMeta)
// For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here.
p.currMeta.MaxTime = maxt
p.currMeta.Chunk, iterable, maxt, p.err = hcr.ChunkOrIterableWithCopy(p.currMeta)
if p.currMeta.Chunk != nil {
// For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here.
p.currMeta.MaxTime = maxt
}
} else {
p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta)
}

View file

@ -20,6 +20,7 @@ import (
"testing"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"
@ -254,56 +255,98 @@ func BenchmarkMergedStringIter(b *testing.B) {
b.ReportAllocs()
}
func BenchmarkQuerierSelect(b *testing.B) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = b.TempDir()
h, err := NewHead(nil, nil, nil, nil, opts, nil)
func createHeadForBenchmarkSelect(b *testing.B, numSeries int, addSeries func(app storage.Appender, i int)) (*Head, *DB) {
dir := b.TempDir()
opts := DefaultOptions()
opts.OutOfOrderCapMax = 255
opts.OutOfOrderTimeWindow = 1000
db, err := Open(dir, nil, nil, opts, nil)
require.NoError(b, err)
defer h.Close()
b.Cleanup(func() {
require.NoError(b, db.Close())
})
h := db.Head()
app := h.Appender(context.Background())
numSeries := 1000000
for i := 0; i < numSeries; i++ {
app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0)
addSeries(app, i)
}
require.NoError(b, app.Commit())
return h, db
}
bench := func(b *testing.B, br BlockReader, sorted bool) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
for s := 1; s <= numSeries; s *= 10 {
b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) {
q, err := NewBlockQuerier(br, 0, int64(s-1))
require.NoError(b, err)
func benchmarkSelect(b *testing.B, queryable storage.Queryable, numSeries int, sorted bool) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
b.ResetTimer()
for s := 1; s <= numSeries; s *= 10 {
b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) {
q, err := queryable.Querier(0, int64(s-1))
require.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
ss := q.Select(context.Background(), sorted, nil, matcher)
for ss.Next() {
}
require.NoError(b, ss.Err())
b.ResetTimer()
for i := 0; i < b.N; i++ {
ss := q.Select(context.Background(), sorted, nil, matcher)
for ss.Next() {
}
q.Close()
})
}
require.NoError(b, ss.Err())
}
q.Close()
})
}
}
func BenchmarkQuerierSelect(b *testing.B) {
numSeries := 1000000
h, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) {
_, err := app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0)
if err != nil {
b.Fatal(err)
}
})
b.Run("Head", func(b *testing.B) {
bench(b, h, false)
benchmarkSelect(b, db, numSeries, false)
})
b.Run("SortedHead", func(b *testing.B) {
bench(b, h, true)
benchmarkSelect(b, db, numSeries, true)
})
tmpdir := b.TempDir()
blockdir := createBlockFromHead(b, tmpdir, h)
block, err := OpenBlock(nil, blockdir, nil)
require.NoError(b, err)
defer func() {
require.NoError(b, block.Close())
}()
b.Run("Block", func(b *testing.B) {
bench(b, block, false)
tmpdir := b.TempDir()
blockdir := createBlockFromHead(b, tmpdir, h)
block, err := OpenBlock(nil, blockdir, nil)
require.NoError(b, err)
defer func() {
require.NoError(b, block.Close())
}()
benchmarkSelect(b, (*queryableBlock)(block), numSeries, false)
})
}
// Type wrapper to let a Block be a Queryable in benchmarkSelect().
type queryableBlock Block
func (pb *queryableBlock) Querier(mint, maxt int64) (storage.Querier, error) {
return NewBlockQuerier((*Block)(pb), mint, maxt)
}
func BenchmarkQuerierSelectWithOutOfOrder(b *testing.B) {
numSeries := 1000000
_, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) {
l := labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix))
ref, err := app.Append(0, l, int64(i+1), 0)
if err != nil {
b.Fatal(err)
}
_, err = app.Append(ref, l, int64(i), 1) // Out of order sample
if err != nil {
b.Fatal(err)
}
})
b.Run("Head", func(b *testing.B) {
benchmarkSelect(b, db, numSeries, false)
})
}

View file

@ -3169,12 +3169,11 @@ func BenchmarkQueries(b *testing.B) {
qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples)
require.NoError(b, err)
qOOOHead, err := NewBlockQuerier(NewOOORangeHead(head, 1, nSamples, 0), 1, nSamples)
require.NoError(b, err)
isoState := head.oooIso.TrackReadAfter(0)
qOOOHead := NewHeadAndOOOQuerier(1, nSamples, head, isoState, qHead)
queryTypes = append(queryTypes, qt{
fmt.Sprintf("_Head_oooPercent:%d", oooPercentage),
storage.NewMergeQuerier([]storage.Querier{qHead, qOOOHead}, nil, storage.ChainedSeriesMerge),
fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), qOOOHead,
})
}