Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks (#13115)

* Add failing test.

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Don't run OOO head garbage collection while reads are running.

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Add further test cases for different order of operations.

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Ensure all queriers are closed if `DB.blockChunkQuerierForRange()` fails.

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Ensure all queriers are closed if `DB.Querier()` fails.

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Invert error handling in `DB.Querier()` and `DB.blockChunkQuerierForRange()` to make it clearer

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Ensure that queries that touch OOO data can't block OOO head garbage collection forever.

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Address PR feedback: fix parameter name in comment

Co-authored-by: Jesus Vazquez <jesusvazquez@users.noreply.github.com>
Signed-off-by: Charles Korn <charleskorn@users.noreply.github.com>

* Address PR feedback: use `lastGarbageCollectedMmapRef`

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Address PR feedback: ensure pending reads are cleaned up if creating an OOO querier fails

Signed-off-by: Charles Korn <charles.korn@grafana.com>

---------

Signed-off-by: Charles Korn <charles.korn@grafana.com>
Signed-off-by: Charles Korn <charleskorn@users.noreply.github.com>
Co-authored-by: Jesus Vazquez <jesusvazquez@users.noreply.github.com>
This commit is contained in:
Charles Korn 2023-11-24 22:38:38 +11:00 committed by GitHub
parent 35a15e8f04
commit 59844498f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 546 additions and 86 deletions

View file

@ -203,10 +203,14 @@ type DB struct {
compactor Compactor
blocksToDelete BlocksToDeleteFunc
// Mutex for that must be held when modifying the general block layout.
// Mutex for that must be held when modifying the general block layout or lastGarbageCollectedMmapRef.
mtx sync.RWMutex
blocks []*Block
// The last OOO chunk that was compacted and written to disk. New queriers must not read chunks less
// than or equal to this reference, as these chunks could be garbage collected at any time.
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
head *Head
compactc chan struct{}
@ -1243,6 +1247,20 @@ func (db *DB) compactOOOHead(ctx context.Context) error {
lastWBLFile, minOOOMmapRef := oooHead.LastWBLFile(), oooHead.LastMmapRef()
if lastWBLFile != 0 || minOOOMmapRef != 0 {
if minOOOMmapRef != 0 {
// Ensure that no more queriers are created that will reference chunks we're about to garbage collect.
// truncateOOO waits for any existing queriers that reference chunks we're about to garbage collect to
// complete before running garbage collection, so we don't need to do that here.
//
// We take mtx to ensure that Querier() and ChunkQuerier() don't miss blocks: without this, they could
// capture the list of blocks before the call to reloadBlocks() above runs, but then capture
// lastGarbageCollectedMmapRef after we update it here, and therefore not query either the blocks we've just
// written or the head chunks those blocks were created from.
db.mtx.Lock()
db.lastGarbageCollectedMmapRef = minOOOMmapRef
db.mtx.Unlock()
}
if err := db.head.truncateOOO(lastWBLFile, minOOOMmapRef); err != nil {
return errors.Wrap(err, "truncate ooo wbl")
}
@ -1869,7 +1887,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
}
// Querier returns a new querier over the data partition for the given time range.
func (db *DB) Querier(mint, maxt int64) (storage.Querier, error) {
func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
var blocks []BlockReader
db.mtx.RLock()
@ -1880,11 +1898,23 @@ func (db *DB) Querier(mint, maxt int64) (storage.Querier, error) {
blocks = append(blocks, b)
}
}
var inOrderHeadQuerier storage.Querier
blockQueriers := make([]storage.Querier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers
defer func() {
if err != nil {
// If we fail, all previously opened queriers must be closed.
for _, q := range blockQueriers {
// TODO(bwplotka): Handle error.
_ = q.Close()
}
}
}()
if maxt >= db.head.MinTime() {
rh := NewRangeHead(db.head, mint, maxt)
var err error
inOrderHeadQuerier, err = NewBlockQuerier(rh, mint, maxt)
inOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open block querier for head %s", rh)
}
@ -1906,44 +1936,40 @@ func (db *DB) Querier(mint, maxt int64) (storage.Querier, error) {
return nil, errors.Wrapf(err, "open block querier for head while getting new querier %s", rh)
}
}
}
var outOfOrderHeadQuerier storage.Querier
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
rh := NewOOORangeHead(db.head, mint, maxt)
var err error
outOfOrderHeadQuerier, err = NewBlockQuerier(rh, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open block querier for ooo head %s", rh)
}
}
blockQueriers := make([]storage.Querier, 0, len(blocks))
for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
if err == nil {
blockQueriers = append(blockQueriers, q)
continue
}
// If we fail, all previously opened queriers must be closed.
for _, q := range blockQueriers {
// TODO(bwplotka): Handle error.
_ = q.Close()
}
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
if inOrderHeadQuerier != nil {
blockQueriers = append(blockQueriers, inOrderHeadQuerier)
}
if outOfOrderHeadQuerier != nil {
}
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
var err error
outOfOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt)
if err != nil {
// If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead.
rh.isoState.Close()
return nil, errors.Wrapf(err, "open block querier for ooo head %s", rh)
}
blockQueriers = append(blockQueriers, outOfOrderHeadQuerier)
}
for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
blockQueriers = append(blockQueriers, q)
}
return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil
}
// blockChunkQuerierForRange returns individual block chunk queriers from the persistent blocks, in-order head block, and the
// out-of-order head block, overlapping with the given time range.
func (db *DB) blockChunkQuerierForRange(mint, maxt int64) ([]storage.ChunkQuerier, error) {
func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuerier, err error) {
var blocks []BlockReader
db.mtx.RLock()
@ -1954,11 +1980,22 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) ([]storage.ChunkQuerie
blocks = append(blocks, b)
}
}
var inOrderHeadQuerier storage.ChunkQuerier
blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers
defer func() {
if err != nil {
// If we fail, all previously opened queriers must be closed.
for _, q := range blockQueriers {
// TODO(bwplotka): Handle error.
_ = q.Close()
}
}
}()
if maxt >= db.head.MinTime() {
rh := NewRangeHead(db.head, mint, maxt)
var err error
inOrderHeadQuerier, err = NewBlockChunkQuerier(rh, mint, maxt)
inOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open querier for head %s", rh)
}
@ -1980,39 +2017,30 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) ([]storage.ChunkQuerie
return nil, errors.Wrapf(err, "open querier for head while getting new querier %s", rh)
}
}
}
var outOfOrderHeadQuerier storage.ChunkQuerier
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
rh := NewOOORangeHead(db.head, mint, maxt)
var err error
outOfOrderHeadQuerier, err = NewBlockChunkQuerier(rh, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open block chunk querier for ooo head %s", rh)
}
}
blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks))
for _, b := range blocks {
q, err := NewBlockChunkQuerier(b, mint, maxt)
if err == nil {
blockQueriers = append(blockQueriers, q)
continue
}
// If we fail, all previously opened queriers must be closed.
for _, q := range blockQueriers {
// TODO(bwplotka): Handle error.
_ = q.Close()
}
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
if inOrderHeadQuerier != nil {
blockQueriers = append(blockQueriers, inOrderHeadQuerier)
}
if outOfOrderHeadQuerier != nil {
}
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
outOfOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open block chunk querier for ooo head %s", rh)
}
blockQueriers = append(blockQueriers, outOfOrderHeadQuerier)
}
for _, b := range blocks {
q, err := NewBlockChunkQuerier(b, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
blockQueriers = append(blockQueriers, q)
}
return blockQueriers, nil
}

View file

@ -38,6 +38,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
"github.com/prometheus/prometheus/config"
@ -3611,6 +3612,264 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun
}
}
func TestQuerierShouldNotFailIfOOOCompactionOccursAfterRetrievingQuerier(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 3 * DefaultBlockDuration
db := openTestDB(t, opts, nil)
defer func() {
require.NoError(t, db.Close())
}()
// Disable compactions so we can control it.
db.DisableCompactions()
metric := labels.FromStrings(labels.MetricName, "test_metric")
ctx := context.Background()
interval := int64(15 * time.Second / time.Millisecond)
ts := int64(0)
samplesWritten := 0
// Capture the first timestamp - this will be the timestamp of the OOO sample we'll append below.
oooTS := ts
ts += interval
// Push samples after the OOO sample we'll write below.
for ; ts < 10*interval; ts += interval {
app := db.Appender(ctx)
_, err := app.Append(0, metric, ts, float64(ts))
require.NoError(t, err)
require.NoError(t, app.Commit())
samplesWritten++
}
// Push a single OOO sample.
app := db.Appender(ctx)
_, err := app.Append(0, metric, oooTS, float64(ts))
require.NoError(t, err)
require.NoError(t, app.Commit())
samplesWritten++
// Get a querier.
querierCreatedBeforeCompaction, err := db.ChunkQuerier(0, math.MaxInt64)
require.NoError(t, err)
// Start OOO head compaction.
compactionComplete := atomic.NewBool(false)
go func() {
defer compactionComplete.Store(true)
require.NoError(t, db.CompactOOOHead(ctx))
require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved))
}()
// Give CompactOOOHead time to start work.
// If it does not wait for querierCreatedBeforeCompaction to be closed, then the query will return incorrect results or fail.
time.Sleep(time.Second)
require.False(t, compactionComplete.Load(), "compaction completed before reading chunks or closing querier created before compaction")
// Get another querier. This one should only use the compacted blocks from disk and ignore the chunks that will be garbage collected.
querierCreatedAfterCompaction, err := db.ChunkQuerier(0, math.MaxInt64)
require.NoError(t, err)
testQuerier := func(q storage.ChunkQuerier) {
// Query back the series.
hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval}
seriesSet := q.Select(ctx, true, hints, labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric"))
// Collect the iterator for the series.
var iterators []chunks.Iterator
for seriesSet.Next() {
iterators = append(iterators, seriesSet.At().Iterator(nil))
}
require.NoError(t, seriesSet.Err())
require.Len(t, iterators, 1)
iterator := iterators[0]
// Check that we can still successfully read all samples.
samplesRead := 0
for iterator.Next() {
samplesRead += iterator.At().Chunk.NumSamples()
}
require.NoError(t, iterator.Err())
require.Equal(t, samplesWritten, samplesRead)
}
testQuerier(querierCreatedBeforeCompaction)
require.False(t, compactionComplete.Load(), "compaction completed before closing querier created before compaction")
require.NoError(t, querierCreatedBeforeCompaction.Close())
require.Eventually(t, compactionComplete.Load, time.Second, 10*time.Millisecond, "compaction should complete after querier created before compaction was closed, and not wait for querier created after compaction")
// Use the querier created after compaction and confirm it returns the expected results (ie. from the disk block created from OOO head and in-order head) without error.
testQuerier(querierCreatedAfterCompaction)
require.NoError(t, querierCreatedAfterCompaction.Close())
}
func TestQuerierShouldNotFailIfOOOCompactionOccursAfterSelecting(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 3 * DefaultBlockDuration
db := openTestDB(t, opts, nil)
defer func() {
require.NoError(t, db.Close())
}()
// Disable compactions so we can control it.
db.DisableCompactions()
metric := labels.FromStrings(labels.MetricName, "test_metric")
ctx := context.Background()
interval := int64(15 * time.Second / time.Millisecond)
ts := int64(0)
samplesWritten := 0
// Capture the first timestamp - this will be the timestamp of the OOO sample we'll append below.
oooTS := ts
ts += interval
// Push samples after the OOO sample we'll write below.
for ; ts < 10*interval; ts += interval {
app := db.Appender(ctx)
_, err := app.Append(0, metric, ts, float64(ts))
require.NoError(t, err)
require.NoError(t, app.Commit())
samplesWritten++
}
// Push a single OOO sample.
app := db.Appender(ctx)
_, err := app.Append(0, metric, oooTS, float64(ts))
require.NoError(t, err)
require.NoError(t, app.Commit())
samplesWritten++
// Get a querier.
querier, err := db.ChunkQuerier(0, math.MaxInt64)
require.NoError(t, err)
// Query back the series.
hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval}
seriesSet := querier.Select(ctx, true, hints, labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric"))
// Start OOO head compaction.
compactionComplete := atomic.NewBool(false)
go func() {
defer compactionComplete.Store(true)
require.NoError(t, db.CompactOOOHead(ctx))
require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved))
}()
// Give CompactOOOHead time to start work.
// If it does not wait for the querier to be closed, then the query will return incorrect results or fail.
time.Sleep(time.Second)
require.False(t, compactionComplete.Load(), "compaction completed before reading chunks or closing querier")
// Collect the iterator for the series.
var iterators []chunks.Iterator
for seriesSet.Next() {
iterators = append(iterators, seriesSet.At().Iterator(nil))
}
require.NoError(t, seriesSet.Err())
require.Len(t, iterators, 1)
iterator := iterators[0]
// Check that we can still successfully read all samples.
samplesRead := 0
for iterator.Next() {
samplesRead += iterator.At().Chunk.NumSamples()
}
require.NoError(t, iterator.Err())
require.Equal(t, samplesWritten, samplesRead)
require.False(t, compactionComplete.Load(), "compaction completed before closing querier")
require.NoError(t, querier.Close())
require.Eventually(t, compactionComplete.Load, time.Second, 10*time.Millisecond, "compaction should complete after querier was closed")
}
func TestQuerierShouldNotFailIfOOOCompactionOccursAfterRetrievingIterators(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 3 * DefaultBlockDuration
db := openTestDB(t, opts, nil)
defer func() {
require.NoError(t, db.Close())
}()
// Disable compactions so we can control it.
db.DisableCompactions()
metric := labels.FromStrings(labels.MetricName, "test_metric")
ctx := context.Background()
interval := int64(15 * time.Second / time.Millisecond)
ts := int64(0)
samplesWritten := 0
// Capture the first timestamp - this will be the timestamp of the OOO sample we'll append below.
oooTS := ts
ts += interval
// Push samples after the OOO sample we'll write below.
for ; ts < 10*interval; ts += interval {
app := db.Appender(ctx)
_, err := app.Append(0, metric, ts, float64(ts))
require.NoError(t, err)
require.NoError(t, app.Commit())
samplesWritten++
}
// Push a single OOO sample.
app := db.Appender(ctx)
_, err := app.Append(0, metric, oooTS, float64(ts))
require.NoError(t, err)
require.NoError(t, app.Commit())
samplesWritten++
// Get a querier.
querier, err := db.ChunkQuerier(0, math.MaxInt64)
require.NoError(t, err)
// Query back the series.
hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval}
seriesSet := querier.Select(ctx, true, hints, labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric"))
// Collect the iterator for the series.
var iterators []chunks.Iterator
for seriesSet.Next() {
iterators = append(iterators, seriesSet.At().Iterator(nil))
}
require.NoError(t, seriesSet.Err())
require.Len(t, iterators, 1)
iterator := iterators[0]
// Start OOO head compaction.
compactionComplete := atomic.NewBool(false)
go func() {
defer compactionComplete.Store(true)
require.NoError(t, db.CompactOOOHead(ctx))
require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved))
}()
// Give CompactOOOHead time to start work.
// If it does not wait for the querier to be closed, then the query will return incorrect results or fail.
time.Sleep(time.Second)
require.False(t, compactionComplete.Load(), "compaction completed before reading chunks or closing querier")
// Check that we can still successfully read all samples.
samplesRead := 0
for iterator.Next() {
samplesRead += iterator.At().Chunk.NumSamples()
}
require.NoError(t, iterator.Err())
require.Equal(t, samplesWritten, samplesRead)
require.False(t, compactionComplete.Load(), "compaction completed before closing querier")
require.NoError(t, querier.Close())
require.Eventually(t, compactionComplete.Load, time.Second, 10*time.Millisecond, "compaction should complete after querier was closed")
}
func newTestDB(t *testing.T) *DB {
dir := t.TempDir()

View file

@ -106,6 +106,8 @@ type Head struct {
iso *isolation
oooIso *oooIsolation
cardinalityMutex sync.Mutex
cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec.
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching.
@ -300,6 +302,7 @@ func (h *Head) resetInMemoryState() error {
}
h.iso = newIsolation(h.opts.IsolationDisabled)
h.oooIso = newOOOIsolation()
h.exemplarMetrics = em
h.exemplars = es
@ -1133,6 +1136,14 @@ func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) {
}
}
// WaitForPendingReadersForOOOChunksAtOrBefore is like WaitForPendingReadersInTimeRange, except it waits for
// queries touching OOO chunks less than or equal to chunk to finish querying.
func (h *Head) WaitForPendingReadersForOOOChunksAtOrBefore(chunk chunks.ChunkDiskMapperRef) {
for h.oooIso.HasOpenReadsAtOrBefore(chunk) {
time.Sleep(500 * time.Millisecond)
}
}
// WaitForAppendersOverlapping waits for appends overlapping maxt to finish.
func (h *Head) WaitForAppendersOverlapping(maxt int64) {
for maxt >= h.iso.lowestAppendTime() {
@ -1271,13 +1282,19 @@ func (h *Head) truncateWAL(mint int64) error {
}
// truncateOOO
// - waits for any pending reads that potentially touch chunks less than or equal to newMinOOOMmapRef
// - truncates the OOO WBL files whose index is strictly less than lastWBLFile.
// - garbage collects all the m-map chunks from the memory that are less than or equal to minOOOMmapRef
// - garbage collects all the m-map chunks from the memory that are less than or equal to newMinOOOMmapRef
// and then deletes the series that do not have any data anymore.
func (h *Head) truncateOOO(lastWBLFile int, minOOOMmapRef chunks.ChunkDiskMapperRef) error {
//
// The caller is responsible for ensuring that no further queriers will be created that reference chunks less
// than or equal to newMinOOOMmapRef before calling truncateOOO.
func (h *Head) truncateOOO(lastWBLFile int, newMinOOOMmapRef chunks.ChunkDiskMapperRef) error {
curMinOOOMmapRef := chunks.ChunkDiskMapperRef(h.minOOOMmapRef.Load())
if minOOOMmapRef.GreaterThan(curMinOOOMmapRef) {
h.minOOOMmapRef.Store(uint64(minOOOMmapRef))
if newMinOOOMmapRef.GreaterThan(curMinOOOMmapRef) {
h.WaitForPendingReadersForOOOChunksAtOrBefore(newMinOOOMmapRef)
h.minOOOMmapRef.Store(uint64(newMinOOOMmapRef))
if err := h.truncateSeriesAndChunkDiskMapper("truncateOOO"); err != nil {
return err
}

View file

@ -20,6 +20,7 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tombstones"
)
@ -113,22 +114,27 @@ type OOORangeHead struct {
// the timerange of the query and having preexisting pointers to the first
// and last timestamp help with that.
mint, maxt int64
isoState *oooIsolationState
}
func NewOOORangeHead(head *Head, mint, maxt int64) *OOORangeHead {
func NewOOORangeHead(head *Head, mint, maxt int64, minRef chunks.ChunkDiskMapperRef) *OOORangeHead {
isoState := head.oooIso.TrackReadAfter(minRef)
return &OOORangeHead{
head: head,
mint: mint,
maxt: maxt,
isoState: isoState,
}
}
func (oh *OOORangeHead) Index() (IndexReader, error) {
return NewOOOHeadIndexReader(oh.head, oh.mint, oh.maxt), nil
return NewOOOHeadIndexReader(oh.head, oh.mint, oh.maxt, oh.isoState.minRef), nil
}
func (oh *OOORangeHead) Chunks() (ChunkReader, error) {
return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt), nil
return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState), nil
}
func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) {

View file

@ -39,25 +39,28 @@ var _ IndexReader = &OOOHeadIndexReader{}
// The only methods that change are the ones about getting Series and Postings.
type OOOHeadIndexReader struct {
*headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
}
func NewOOOHeadIndexReader(head *Head, mint, maxt int64) *OOOHeadIndexReader {
func NewOOOHeadIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *OOOHeadIndexReader {
hr := &headIndexReader{
head: head,
mint: mint,
maxt: maxt,
}
return &OOOHeadIndexReader{hr}
return &OOOHeadIndexReader{hr, lastGarbageCollectedMmapRef}
}
func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
return oh.series(ref, builder, chks, 0)
return oh.series(ref, builder, chks, oh.lastGarbageCollectedMmapRef, 0)
}
// The passed lastMmapRef tells upto what max m-map chunk that we can consider.
// If it is 0, it means all chunks need to be considered.
// If it is non-0, then the oooHeadChunk must not be considered.
func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta, lastMmapRef chunks.ChunkDiskMapperRef) error {
// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so
// any chunk at or before this ref will not be considered. 0 disables this check.
//
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
// the oooHeadChunk will not be considered.
func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef) error {
s := oh.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
@ -112,14 +115,14 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
// so we can set the correct markers.
if s.ooo.oooHeadChunk != nil {
c := s.ooo.oooHeadChunk
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && lastMmapRef == 0 {
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
addChunk(c.minTime, c.maxTime, ref)
}
}
for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
c := s.ooo.oooMmappedChunks[i]
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (lastMmapRef == 0 || lastMmapRef.GreaterThanOrEqualTo(c.ref)) {
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
addChunk(c.minTime, c.maxTime, ref)
}
@ -232,13 +235,15 @@ func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values
type OOOHeadChunkReader struct {
head *Head
mint, maxt int64
isoState *oooIsolationState
}
func NewOOOHeadChunkReader(head *Head, mint, maxt int64) *OOOHeadChunkReader {
func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState) *OOOHeadChunkReader {
return &OOOHeadChunkReader{
head: head,
mint: mint,
maxt: maxt,
isoState: isoState,
}
}
@ -272,6 +277,9 @@ func (cr OOOHeadChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
}
func (cr OOOHeadChunkReader) Close() error {
if cr.isoState != nil {
cr.isoState.Close()
}
return nil
}
@ -306,7 +314,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
ch.lastWBLFile = lastWBLFile
}
ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64)
ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64, 0)
n, v := index.AllPostingsKey()
// TODO: verify this gets only ooo samples.
@ -365,7 +373,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) {
}
func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) {
return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt), nil
return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil), nil
}
func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) {
@ -391,7 +399,7 @@ func (ch *OOOCompactionHead) Meta() BlockMeta {
// Only the method of BlockReader interface are valid for the cloned OOOCompactionHead.
func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead {
return &OOOCompactionHead{
oooIR: NewOOOHeadIndexReader(ch.oooIR.head, mint, maxt),
oooIR: NewOOOHeadIndexReader(ch.oooIR.head, mint, maxt, 0),
lastMmapRef: ch.lastMmapRef,
postings: ch.postings,
chunkRange: ch.chunkRange,
@ -433,7 +441,7 @@ func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.P
}
func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
return ir.ch.oooIR.series(ref, builder, chks, ir.ch.lastMmapRef)
return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef)
}
func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {

View file

@ -356,7 +356,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
})
}
ir := NewOOOHeadIndexReader(h, tc.queryMinT, tc.queryMaxT)
ir := NewOOOHeadIndexReader(h, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
@ -437,7 +437,7 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// We first want to test using a head index reader that covers the biggest query interval
oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT)
oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT, 0)
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
values, err := oh.LabelValues(ctx, "foo", matchers...)
sort.Strings(values)
@ -484,7 +484,8 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
cr := NewOOOHeadChunkReader(db.head, 0, 1000)
cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil)
defer cr.Close()
c, err := cr.Chunk(chunks.Meta{
Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
})
@ -842,14 +843,15 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
// The Series method is the one that populates the chunk meta OOO
// markers like OOOLastRef. These are then used by the ChunkReader.
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT)
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err := ir.Series(s1Ref, &b, &chks)
require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks))
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT)
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, err := cr.Chunk(chks[i])
require.NoError(t, err)
@ -1005,7 +1007,7 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
// The Series method is the one that populates the chunk meta OOO
// markers like OOOLastRef. These are then used by the ChunkReader.
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT)
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err := ir.Series(s1Ref, &b, &chks)
@ -1020,7 +1022,8 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
}
require.NoError(t, app.Commit())
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT)
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, err := cr.Chunk(chks[i])
require.NoError(t, err)

79
tsdb/ooo_isolation.go Normal file
View file

@ -0,0 +1,79 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"container/list"
"sync"
"github.com/prometheus/prometheus/tsdb/chunks"
)
type oooIsolation struct {
mtx sync.RWMutex
openReads *list.List
}
type oooIsolationState struct {
i *oooIsolation
e *list.Element
minRef chunks.ChunkDiskMapperRef
}
func newOOOIsolation() *oooIsolation {
return &oooIsolation{
openReads: list.New(),
}
}
// HasOpenReadsAtOrBefore returns true if this oooIsolation is aware of any reads that use
// chunks with reference at or before ref.
func (i *oooIsolation) HasOpenReadsAtOrBefore(ref chunks.ChunkDiskMapperRef) bool {
i.mtx.RLock()
defer i.mtx.RUnlock()
for e := i.openReads.Front(); e != nil; e = e.Next() {
s := e.Value.(*oooIsolationState)
if ref.GreaterThan(s.minRef) {
return true
}
}
return false
}
// TrackReadAfter records a read that uses chunks with reference after minRef.
//
// The caller must ensure that the returned oooIsolationState is eventually closed when
// the read is complete.
func (i *oooIsolation) TrackReadAfter(minRef chunks.ChunkDiskMapperRef) *oooIsolationState {
s := &oooIsolationState{
i: i,
minRef: minRef,
}
i.mtx.Lock()
s.e = i.openReads.PushBack(s)
i.mtx.Unlock()
return s
}
func (s oooIsolationState) Close() {
s.i.mtx.Lock()
s.i.openReads.Remove(s.e)
s.i.mtx.Unlock()
}

View file

@ -0,0 +1,60 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestOOOIsolation(t *testing.T) {
i := newOOOIsolation()
// Empty state shouldn't have any open reads.
require.False(t, i.HasOpenReadsAtOrBefore(0))
require.False(t, i.HasOpenReadsAtOrBefore(1))
require.False(t, i.HasOpenReadsAtOrBefore(2))
require.False(t, i.HasOpenReadsAtOrBefore(3))
// Add a read.
read1 := i.TrackReadAfter(1)
require.False(t, i.HasOpenReadsAtOrBefore(0))
require.False(t, i.HasOpenReadsAtOrBefore(1))
require.True(t, i.HasOpenReadsAtOrBefore(2))
// Add another overlapping read.
read2 := i.TrackReadAfter(0)
require.False(t, i.HasOpenReadsAtOrBefore(0))
require.True(t, i.HasOpenReadsAtOrBefore(1))
require.True(t, i.HasOpenReadsAtOrBefore(2))
// Close the second read, should now only report open reads for the first read's ref.
read2.Close()
require.False(t, i.HasOpenReadsAtOrBefore(0))
require.False(t, i.HasOpenReadsAtOrBefore(1))
require.True(t, i.HasOpenReadsAtOrBefore(2))
// Close the second read again: this should do nothing and ensures we can safely call Close() multiple times.
read2.Close()
require.False(t, i.HasOpenReadsAtOrBefore(0))
require.False(t, i.HasOpenReadsAtOrBefore(1))
require.True(t, i.HasOpenReadsAtOrBefore(2))
// Closing the first read should indicate no further open reads.
read1.Close()
require.False(t, i.HasOpenReadsAtOrBefore(0))
require.False(t, i.HasOpenReadsAtOrBefore(1))
require.False(t, i.HasOpenReadsAtOrBefore(2))
}

View file

@ -2803,7 +2803,7 @@ func BenchmarkQueries(b *testing.B) {
qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples)
require.NoError(b, err)
qOOOHead, err := NewBlockQuerier(NewOOORangeHead(head, 1, nSamples), 1, nSamples)
qOOOHead, err := NewBlockQuerier(NewOOORangeHead(head, 1, nSamples, 0), 1, nSamples)
require.NoError(b, err)
queryTypes = append(queryTypes, qt{