diff --git a/tsdb/db.go b/tsdb/db.go index c4c05e390..2e3801a9e 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -203,10 +203,14 @@ type DB struct { compactor Compactor blocksToDelete BlocksToDeleteFunc - // Mutex for that must be held when modifying the general block layout. + // Mutex for that must be held when modifying the general block layout or lastGarbageCollectedMmapRef. mtx sync.RWMutex blocks []*Block + // The last OOO chunk that was compacted and written to disk. New queriers must not read chunks less + // than or equal to this reference, as these chunks could be garbage collected at any time. + lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef + head *Head compactc chan struct{} @@ -1243,6 +1247,20 @@ func (db *DB) compactOOOHead(ctx context.Context) error { lastWBLFile, minOOOMmapRef := oooHead.LastWBLFile(), oooHead.LastMmapRef() if lastWBLFile != 0 || minOOOMmapRef != 0 { + if minOOOMmapRef != 0 { + // Ensure that no more queriers are created that will reference chunks we're about to garbage collect. + // truncateOOO waits for any existing queriers that reference chunks we're about to garbage collect to + // complete before running garbage collection, so we don't need to do that here. + // + // We take mtx to ensure that Querier() and ChunkQuerier() don't miss blocks: without this, they could + // capture the list of blocks before the call to reloadBlocks() above runs, but then capture + // lastGarbageCollectedMmapRef after we update it here, and therefore not query either the blocks we've just + // written or the head chunks those blocks were created from. + db.mtx.Lock() + db.lastGarbageCollectedMmapRef = minOOOMmapRef + db.mtx.Unlock() + } + if err := db.head.truncateOOO(lastWBLFile, minOOOMmapRef); err != nil { return errors.Wrap(err, "truncate ooo wbl") } @@ -1869,7 +1887,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error { } // Querier returns a new querier over the data partition for the given time range. -func (db *DB) Querier(mint, maxt int64) (storage.Querier, error) { +func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { var blocks []BlockReader db.mtx.RLock() @@ -1880,11 +1898,23 @@ func (db *DB) Querier(mint, maxt int64) (storage.Querier, error) { blocks = append(blocks, b) } } - var inOrderHeadQuerier storage.Querier + + blockQueriers := make([]storage.Querier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers + + defer func() { + if err != nil { + // If we fail, all previously opened queriers must be closed. + for _, q := range blockQueriers { + // TODO(bwplotka): Handle error. + _ = q.Close() + } + } + }() + if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) var err error - inOrderHeadQuerier, err = NewBlockQuerier(rh, mint, maxt) + inOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt) if err != nil { return nil, errors.Wrapf(err, "open block querier for head %s", rh) } @@ -1906,44 +1936,40 @@ func (db *DB) Querier(mint, maxt int64) (storage.Querier, error) { return nil, errors.Wrapf(err, "open block querier for head while getting new querier %s", rh) } } + + if inOrderHeadQuerier != nil { + blockQueriers = append(blockQueriers, inOrderHeadQuerier) + } } - var outOfOrderHeadQuerier storage.Querier if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { - rh := NewOOORangeHead(db.head, mint, maxt) + rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) var err error - outOfOrderHeadQuerier, err = NewBlockQuerier(rh, mint, maxt) + outOfOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt) if err != nil { + // If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead. + rh.isoState.Close() + return nil, errors.Wrapf(err, "open block querier for ooo head %s", rh) } - } - blockQueriers := make([]storage.Querier, 0, len(blocks)) - for _, b := range blocks { - q, err := NewBlockQuerier(b, mint, maxt) - if err == nil { - blockQueriers = append(blockQueriers, q) - continue - } - // If we fail, all previously opened queriers must be closed. - for _, q := range blockQueriers { - // TODO(bwplotka): Handle error. - _ = q.Close() - } - return nil, errors.Wrapf(err, "open querier for block %s", b) - } - if inOrderHeadQuerier != nil { - blockQueriers = append(blockQueriers, inOrderHeadQuerier) - } - if outOfOrderHeadQuerier != nil { blockQueriers = append(blockQueriers, outOfOrderHeadQuerier) } + + for _, b := range blocks { + q, err := NewBlockQuerier(b, mint, maxt) + if err != nil { + return nil, errors.Wrapf(err, "open querier for block %s", b) + } + blockQueriers = append(blockQueriers, q) + } + return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil } // blockChunkQuerierForRange returns individual block chunk queriers from the persistent blocks, in-order head block, and the // out-of-order head block, overlapping with the given time range. -func (db *DB) blockChunkQuerierForRange(mint, maxt int64) ([]storage.ChunkQuerier, error) { +func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuerier, err error) { var blocks []BlockReader db.mtx.RLock() @@ -1954,11 +1980,22 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) ([]storage.ChunkQuerie blocks = append(blocks, b) } } - var inOrderHeadQuerier storage.ChunkQuerier + + blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers + + defer func() { + if err != nil { + // If we fail, all previously opened queriers must be closed. + for _, q := range blockQueriers { + // TODO(bwplotka): Handle error. + _ = q.Close() + } + } + }() + if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) - var err error - inOrderHeadQuerier, err = NewBlockChunkQuerier(rh, mint, maxt) + inOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt) if err != nil { return nil, errors.Wrapf(err, "open querier for head %s", rh) } @@ -1980,37 +2017,28 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) ([]storage.ChunkQuerie return nil, errors.Wrapf(err, "open querier for head while getting new querier %s", rh) } } + + if inOrderHeadQuerier != nil { + blockQueriers = append(blockQueriers, inOrderHeadQuerier) + } } - var outOfOrderHeadQuerier storage.ChunkQuerier if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { - rh := NewOOORangeHead(db.head, mint, maxt) - var err error - outOfOrderHeadQuerier, err = NewBlockChunkQuerier(rh, mint, maxt) + rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) + outOfOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt) if err != nil { return nil, errors.Wrapf(err, "open block chunk querier for ooo head %s", rh) } + + blockQueriers = append(blockQueriers, outOfOrderHeadQuerier) } - blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)) for _, b := range blocks { q, err := NewBlockChunkQuerier(b, mint, maxt) - if err == nil { - blockQueriers = append(blockQueriers, q) - continue + if err != nil { + return nil, errors.Wrapf(err, "open querier for block %s", b) } - // If we fail, all previously opened queriers must be closed. - for _, q := range blockQueriers { - // TODO(bwplotka): Handle error. - _ = q.Close() - } - return nil, errors.Wrapf(err, "open querier for block %s", b) - } - if inOrderHeadQuerier != nil { - blockQueriers = append(blockQueriers, inOrderHeadQuerier) - } - if outOfOrderHeadQuerier != nil { - blockQueriers = append(blockQueriers, outOfOrderHeadQuerier) + blockQueriers = append(blockQueriers, q) } return blockQueriers, nil diff --git a/tsdb/db_test.go b/tsdb/db_test.go index c7ea068d6..5728b49bd 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -38,6 +38,7 @@ import ( "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/goleak" "github.com/prometheus/prometheus/config" @@ -3611,6 +3612,264 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun } } +func TestQuerierShouldNotFailIfOOOCompactionOccursAfterRetrievingQuerier(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 3 * DefaultBlockDuration + db := openTestDB(t, opts, nil) + defer func() { + require.NoError(t, db.Close()) + }() + + // Disable compactions so we can control it. + db.DisableCompactions() + + metric := labels.FromStrings(labels.MetricName, "test_metric") + ctx := context.Background() + interval := int64(15 * time.Second / time.Millisecond) + ts := int64(0) + samplesWritten := 0 + + // Capture the first timestamp - this will be the timestamp of the OOO sample we'll append below. + oooTS := ts + ts += interval + + // Push samples after the OOO sample we'll write below. + for ; ts < 10*interval; ts += interval { + app := db.Appender(ctx) + _, err := app.Append(0, metric, ts, float64(ts)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + samplesWritten++ + } + + // Push a single OOO sample. + app := db.Appender(ctx) + _, err := app.Append(0, metric, oooTS, float64(ts)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + samplesWritten++ + + // Get a querier. + querierCreatedBeforeCompaction, err := db.ChunkQuerier(0, math.MaxInt64) + require.NoError(t, err) + + // Start OOO head compaction. + compactionComplete := atomic.NewBool(false) + go func() { + defer compactionComplete.Store(true) + + require.NoError(t, db.CompactOOOHead(ctx)) + require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved)) + }() + + // Give CompactOOOHead time to start work. + // If it does not wait for querierCreatedBeforeCompaction to be closed, then the query will return incorrect results or fail. + time.Sleep(time.Second) + require.False(t, compactionComplete.Load(), "compaction completed before reading chunks or closing querier created before compaction") + + // Get another querier. This one should only use the compacted blocks from disk and ignore the chunks that will be garbage collected. + querierCreatedAfterCompaction, err := db.ChunkQuerier(0, math.MaxInt64) + require.NoError(t, err) + + testQuerier := func(q storage.ChunkQuerier) { + // Query back the series. + hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval} + seriesSet := q.Select(ctx, true, hints, labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric")) + + // Collect the iterator for the series. + var iterators []chunks.Iterator + for seriesSet.Next() { + iterators = append(iterators, seriesSet.At().Iterator(nil)) + } + require.NoError(t, seriesSet.Err()) + require.Len(t, iterators, 1) + iterator := iterators[0] + + // Check that we can still successfully read all samples. + samplesRead := 0 + for iterator.Next() { + samplesRead += iterator.At().Chunk.NumSamples() + } + + require.NoError(t, iterator.Err()) + require.Equal(t, samplesWritten, samplesRead) + } + + testQuerier(querierCreatedBeforeCompaction) + + require.False(t, compactionComplete.Load(), "compaction completed before closing querier created before compaction") + require.NoError(t, querierCreatedBeforeCompaction.Close()) + require.Eventually(t, compactionComplete.Load, time.Second, 10*time.Millisecond, "compaction should complete after querier created before compaction was closed, and not wait for querier created after compaction") + + // Use the querier created after compaction and confirm it returns the expected results (ie. from the disk block created from OOO head and in-order head) without error. + testQuerier(querierCreatedAfterCompaction) + require.NoError(t, querierCreatedAfterCompaction.Close()) +} + +func TestQuerierShouldNotFailIfOOOCompactionOccursAfterSelecting(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 3 * DefaultBlockDuration + db := openTestDB(t, opts, nil) + defer func() { + require.NoError(t, db.Close()) + }() + + // Disable compactions so we can control it. + db.DisableCompactions() + + metric := labels.FromStrings(labels.MetricName, "test_metric") + ctx := context.Background() + interval := int64(15 * time.Second / time.Millisecond) + ts := int64(0) + samplesWritten := 0 + + // Capture the first timestamp - this will be the timestamp of the OOO sample we'll append below. + oooTS := ts + ts += interval + + // Push samples after the OOO sample we'll write below. + for ; ts < 10*interval; ts += interval { + app := db.Appender(ctx) + _, err := app.Append(0, metric, ts, float64(ts)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + samplesWritten++ + } + + // Push a single OOO sample. + app := db.Appender(ctx) + _, err := app.Append(0, metric, oooTS, float64(ts)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + samplesWritten++ + + // Get a querier. + querier, err := db.ChunkQuerier(0, math.MaxInt64) + require.NoError(t, err) + + // Query back the series. + hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval} + seriesSet := querier.Select(ctx, true, hints, labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric")) + + // Start OOO head compaction. + compactionComplete := atomic.NewBool(false) + go func() { + defer compactionComplete.Store(true) + + require.NoError(t, db.CompactOOOHead(ctx)) + require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved)) + }() + + // Give CompactOOOHead time to start work. + // If it does not wait for the querier to be closed, then the query will return incorrect results or fail. + time.Sleep(time.Second) + require.False(t, compactionComplete.Load(), "compaction completed before reading chunks or closing querier") + + // Collect the iterator for the series. + var iterators []chunks.Iterator + for seriesSet.Next() { + iterators = append(iterators, seriesSet.At().Iterator(nil)) + } + require.NoError(t, seriesSet.Err()) + require.Len(t, iterators, 1) + iterator := iterators[0] + + // Check that we can still successfully read all samples. + samplesRead := 0 + for iterator.Next() { + samplesRead += iterator.At().Chunk.NumSamples() + } + + require.NoError(t, iterator.Err()) + require.Equal(t, samplesWritten, samplesRead) + + require.False(t, compactionComplete.Load(), "compaction completed before closing querier") + require.NoError(t, querier.Close()) + require.Eventually(t, compactionComplete.Load, time.Second, 10*time.Millisecond, "compaction should complete after querier was closed") +} + +func TestQuerierShouldNotFailIfOOOCompactionOccursAfterRetrievingIterators(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 3 * DefaultBlockDuration + db := openTestDB(t, opts, nil) + defer func() { + require.NoError(t, db.Close()) + }() + + // Disable compactions so we can control it. + db.DisableCompactions() + + metric := labels.FromStrings(labels.MetricName, "test_metric") + ctx := context.Background() + interval := int64(15 * time.Second / time.Millisecond) + ts := int64(0) + samplesWritten := 0 + + // Capture the first timestamp - this will be the timestamp of the OOO sample we'll append below. + oooTS := ts + ts += interval + + // Push samples after the OOO sample we'll write below. + for ; ts < 10*interval; ts += interval { + app := db.Appender(ctx) + _, err := app.Append(0, metric, ts, float64(ts)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + samplesWritten++ + } + + // Push a single OOO sample. + app := db.Appender(ctx) + _, err := app.Append(0, metric, oooTS, float64(ts)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + samplesWritten++ + + // Get a querier. + querier, err := db.ChunkQuerier(0, math.MaxInt64) + require.NoError(t, err) + + // Query back the series. + hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval} + seriesSet := querier.Select(ctx, true, hints, labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric")) + + // Collect the iterator for the series. + var iterators []chunks.Iterator + for seriesSet.Next() { + iterators = append(iterators, seriesSet.At().Iterator(nil)) + } + require.NoError(t, seriesSet.Err()) + require.Len(t, iterators, 1) + iterator := iterators[0] + + // Start OOO head compaction. + compactionComplete := atomic.NewBool(false) + go func() { + defer compactionComplete.Store(true) + + require.NoError(t, db.CompactOOOHead(ctx)) + require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved)) + }() + + // Give CompactOOOHead time to start work. + // If it does not wait for the querier to be closed, then the query will return incorrect results or fail. + time.Sleep(time.Second) + require.False(t, compactionComplete.Load(), "compaction completed before reading chunks or closing querier") + + // Check that we can still successfully read all samples. + samplesRead := 0 + for iterator.Next() { + samplesRead += iterator.At().Chunk.NumSamples() + } + + require.NoError(t, iterator.Err()) + require.Equal(t, samplesWritten, samplesRead) + + require.False(t, compactionComplete.Load(), "compaction completed before closing querier") + require.NoError(t, querier.Close()) + require.Eventually(t, compactionComplete.Load, time.Second, 10*time.Millisecond, "compaction should complete after querier was closed") +} + func newTestDB(t *testing.T) *DB { dir := t.TempDir() diff --git a/tsdb/head.go b/tsdb/head.go index 419340506..bf181a415 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -106,6 +106,8 @@ type Head struct { iso *isolation + oooIso *oooIsolation + cardinalityMutex sync.Mutex cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec. lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. @@ -300,6 +302,7 @@ func (h *Head) resetInMemoryState() error { } h.iso = newIsolation(h.opts.IsolationDisabled) + h.oooIso = newOOOIsolation() h.exemplarMetrics = em h.exemplars = es @@ -1133,6 +1136,14 @@ func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) { } } +// WaitForPendingReadersForOOOChunksAtOrBefore is like WaitForPendingReadersInTimeRange, except it waits for +// queries touching OOO chunks less than or equal to chunk to finish querying. +func (h *Head) WaitForPendingReadersForOOOChunksAtOrBefore(chunk chunks.ChunkDiskMapperRef) { + for h.oooIso.HasOpenReadsAtOrBefore(chunk) { + time.Sleep(500 * time.Millisecond) + } +} + // WaitForAppendersOverlapping waits for appends overlapping maxt to finish. func (h *Head) WaitForAppendersOverlapping(maxt int64) { for maxt >= h.iso.lowestAppendTime() { @@ -1271,13 +1282,19 @@ func (h *Head) truncateWAL(mint int64) error { } // truncateOOO +// - waits for any pending reads that potentially touch chunks less than or equal to newMinOOOMmapRef // - truncates the OOO WBL files whose index is strictly less than lastWBLFile. -// - garbage collects all the m-map chunks from the memory that are less than or equal to minOOOMmapRef +// - garbage collects all the m-map chunks from the memory that are less than or equal to newMinOOOMmapRef // and then deletes the series that do not have any data anymore. -func (h *Head) truncateOOO(lastWBLFile int, minOOOMmapRef chunks.ChunkDiskMapperRef) error { +// +// The caller is responsible for ensuring that no further queriers will be created that reference chunks less +// than or equal to newMinOOOMmapRef before calling truncateOOO. +func (h *Head) truncateOOO(lastWBLFile int, newMinOOOMmapRef chunks.ChunkDiskMapperRef) error { curMinOOOMmapRef := chunks.ChunkDiskMapperRef(h.minOOOMmapRef.Load()) - if minOOOMmapRef.GreaterThan(curMinOOOMmapRef) { - h.minOOOMmapRef.Store(uint64(minOOOMmapRef)) + if newMinOOOMmapRef.GreaterThan(curMinOOOMmapRef) { + h.WaitForPendingReadersForOOOChunksAtOrBefore(newMinOOOMmapRef) + h.minOOOMmapRef.Store(uint64(newMinOOOMmapRef)) + if err := h.truncateSeriesAndChunkDiskMapper("truncateOOO"); err != nil { return err } diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 1251af4a9..7f2110fa6 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -20,6 +20,7 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/tombstones" ) @@ -113,22 +114,27 @@ type OOORangeHead struct { // the timerange of the query and having preexisting pointers to the first // and last timestamp help with that. mint, maxt int64 + + isoState *oooIsolationState } -func NewOOORangeHead(head *Head, mint, maxt int64) *OOORangeHead { +func NewOOORangeHead(head *Head, mint, maxt int64, minRef chunks.ChunkDiskMapperRef) *OOORangeHead { + isoState := head.oooIso.TrackReadAfter(minRef) + return &OOORangeHead{ - head: head, - mint: mint, - maxt: maxt, + head: head, + mint: mint, + maxt: maxt, + isoState: isoState, } } func (oh *OOORangeHead) Index() (IndexReader, error) { - return NewOOOHeadIndexReader(oh.head, oh.mint, oh.maxt), nil + return NewOOOHeadIndexReader(oh.head, oh.mint, oh.maxt, oh.isoState.minRef), nil } func (oh *OOORangeHead) Chunks() (ChunkReader, error) { - return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt), nil + return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState), nil } func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) { diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index b9c2dc4a5..ace232657 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -38,26 +38,29 @@ var _ IndexReader = &OOOHeadIndexReader{} // decided to do this to avoid code duplication. // The only methods that change are the ones about getting Series and Postings. type OOOHeadIndexReader struct { - *headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible. + *headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible. + lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef } -func NewOOOHeadIndexReader(head *Head, mint, maxt int64) *OOOHeadIndexReader { +func NewOOOHeadIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *OOOHeadIndexReader { hr := &headIndexReader{ head: head, mint: mint, maxt: maxt, } - return &OOOHeadIndexReader{hr} + return &OOOHeadIndexReader{hr, lastGarbageCollectedMmapRef} } func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { - return oh.series(ref, builder, chks, 0) + return oh.series(ref, builder, chks, oh.lastGarbageCollectedMmapRef, 0) } -// The passed lastMmapRef tells upto what max m-map chunk that we can consider. -// If it is 0, it means all chunks need to be considered. -// If it is non-0, then the oooHeadChunk must not be considered. -func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta, lastMmapRef chunks.ChunkDiskMapperRef) error { +// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so +// any chunk at or before this ref will not be considered. 0 disables this check. +// +// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then +// the oooHeadChunk will not be considered. +func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef) error { s := oh.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { @@ -112,14 +115,14 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra // so we can set the correct markers. if s.ooo.oooHeadChunk != nil { c := s.ooo.oooHeadChunk - if c.OverlapsClosedInterval(oh.mint, oh.maxt) && lastMmapRef == 0 { + if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) addChunk(c.minTime, c.maxTime, ref) } } for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- { c := s.ooo.oooMmappedChunks[i] - if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (lastMmapRef == 0 || lastMmapRef.GreaterThanOrEqualTo(c.ref)) { + if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) addChunk(c.minTime, c.maxTime, ref) } @@ -232,13 +235,15 @@ func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values type OOOHeadChunkReader struct { head *Head mint, maxt int64 + isoState *oooIsolationState } -func NewOOOHeadChunkReader(head *Head, mint, maxt int64) *OOOHeadChunkReader { +func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState) *OOOHeadChunkReader { return &OOOHeadChunkReader{ - head: head, - mint: mint, - maxt: maxt, + head: head, + mint: mint, + maxt: maxt, + isoState: isoState, } } @@ -272,6 +277,9 @@ func (cr OOOHeadChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { } func (cr OOOHeadChunkReader) Close() error { + if cr.isoState != nil { + cr.isoState.Close() + } return nil } @@ -306,7 +314,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, ch.lastWBLFile = lastWBLFile } - ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64) + ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64, 0) n, v := index.AllPostingsKey() // TODO: verify this gets only ooo samples. @@ -365,7 +373,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) { } func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) { - return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt), nil + return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil), nil } func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) { @@ -391,7 +399,7 @@ func (ch *OOOCompactionHead) Meta() BlockMeta { // Only the method of BlockReader interface are valid for the cloned OOOCompactionHead. func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead { return &OOOCompactionHead{ - oooIR: NewOOOHeadIndexReader(ch.oooIR.head, mint, maxt), + oooIR: NewOOOHeadIndexReader(ch.oooIR.head, mint, maxt, 0), lastMmapRef: ch.lastMmapRef, postings: ch.postings, chunkRange: ch.chunkRange, @@ -433,7 +441,7 @@ func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.P } func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { - return ir.ch.oooIR.series(ref, builder, chks, ir.ch.lastMmapRef) + return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef) } func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index e74a7f9de..3f4b9bae7 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -356,7 +356,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { }) } - ir := NewOOOHeadIndexReader(h, tc.queryMinT, tc.queryMaxT) + ir := NewOOOHeadIndexReader(h, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder @@ -437,7 +437,7 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { // We first want to test using a head index reader that covers the biggest query interval - oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT) + oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT, 0) matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} values, err := oh.LabelValues(ctx, "foo", matchers...) sort.Strings(values) @@ -484,7 +484,8 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) { db := newTestDBWithOpts(t, opts) - cr := NewOOOHeadChunkReader(db.head, 0, 1000) + cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil) + defer cr.Close() c, err := cr.Chunk(chunks.Meta{ Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, }) @@ -842,14 +843,15 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { // The Series method is the one that populates the chunk meta OOO // markers like OOOLastRef. These are then used by the ChunkReader. - ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT) + ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err := ir.Series(s1Ref, &b, &chks) require.NoError(t, err) require.Equal(t, len(tc.expChunksSamples), len(chks)) - cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT) + cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil) + defer cr.Close() for i := 0; i < len(chks); i++ { c, err := cr.Chunk(chks[i]) require.NoError(t, err) @@ -1005,7 +1007,7 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( // The Series method is the one that populates the chunk meta OOO // markers like OOOLastRef. These are then used by the ChunkReader. - ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT) + ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err := ir.Series(s1Ref, &b, &chks) @@ -1020,7 +1022,8 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( } require.NoError(t, app.Commit()) - cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT) + cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil) + defer cr.Close() for i := 0; i < len(chks); i++ { c, err := cr.Chunk(chks[i]) require.NoError(t, err) diff --git a/tsdb/ooo_isolation.go b/tsdb/ooo_isolation.go new file mode 100644 index 000000000..3e3e165a0 --- /dev/null +++ b/tsdb/ooo_isolation.go @@ -0,0 +1,79 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "container/list" + "sync" + + "github.com/prometheus/prometheus/tsdb/chunks" +) + +type oooIsolation struct { + mtx sync.RWMutex + openReads *list.List +} + +type oooIsolationState struct { + i *oooIsolation + e *list.Element + + minRef chunks.ChunkDiskMapperRef +} + +func newOOOIsolation() *oooIsolation { + return &oooIsolation{ + openReads: list.New(), + } +} + +// HasOpenReadsAtOrBefore returns true if this oooIsolation is aware of any reads that use +// chunks with reference at or before ref. +func (i *oooIsolation) HasOpenReadsAtOrBefore(ref chunks.ChunkDiskMapperRef) bool { + i.mtx.RLock() + defer i.mtx.RUnlock() + + for e := i.openReads.Front(); e != nil; e = e.Next() { + s := e.Value.(*oooIsolationState) + + if ref.GreaterThan(s.minRef) { + return true + } + } + + return false +} + +// TrackReadAfter records a read that uses chunks with reference after minRef. +// +// The caller must ensure that the returned oooIsolationState is eventually closed when +// the read is complete. +func (i *oooIsolation) TrackReadAfter(minRef chunks.ChunkDiskMapperRef) *oooIsolationState { + s := &oooIsolationState{ + i: i, + minRef: minRef, + } + + i.mtx.Lock() + s.e = i.openReads.PushBack(s) + i.mtx.Unlock() + + return s +} + +func (s oooIsolationState) Close() { + s.i.mtx.Lock() + s.i.openReads.Remove(s.e) + s.i.mtx.Unlock() +} diff --git a/tsdb/ooo_isolation_test.go b/tsdb/ooo_isolation_test.go new file mode 100644 index 000000000..4ff0488ab --- /dev/null +++ b/tsdb/ooo_isolation_test.go @@ -0,0 +1,60 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestOOOIsolation(t *testing.T) { + i := newOOOIsolation() + + // Empty state shouldn't have any open reads. + require.False(t, i.HasOpenReadsAtOrBefore(0)) + require.False(t, i.HasOpenReadsAtOrBefore(1)) + require.False(t, i.HasOpenReadsAtOrBefore(2)) + require.False(t, i.HasOpenReadsAtOrBefore(3)) + + // Add a read. + read1 := i.TrackReadAfter(1) + require.False(t, i.HasOpenReadsAtOrBefore(0)) + require.False(t, i.HasOpenReadsAtOrBefore(1)) + require.True(t, i.HasOpenReadsAtOrBefore(2)) + + // Add another overlapping read. + read2 := i.TrackReadAfter(0) + require.False(t, i.HasOpenReadsAtOrBefore(0)) + require.True(t, i.HasOpenReadsAtOrBefore(1)) + require.True(t, i.HasOpenReadsAtOrBefore(2)) + + // Close the second read, should now only report open reads for the first read's ref. + read2.Close() + require.False(t, i.HasOpenReadsAtOrBefore(0)) + require.False(t, i.HasOpenReadsAtOrBefore(1)) + require.True(t, i.HasOpenReadsAtOrBefore(2)) + + // Close the second read again: this should do nothing and ensures we can safely call Close() multiple times. + read2.Close() + require.False(t, i.HasOpenReadsAtOrBefore(0)) + require.False(t, i.HasOpenReadsAtOrBefore(1)) + require.True(t, i.HasOpenReadsAtOrBefore(2)) + + // Closing the first read should indicate no further open reads. + read1.Close() + require.False(t, i.HasOpenReadsAtOrBefore(0)) + require.False(t, i.HasOpenReadsAtOrBefore(1)) + require.False(t, i.HasOpenReadsAtOrBefore(2)) +} diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 3c27ab2f3..7260d9d8b 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2803,7 +2803,7 @@ func BenchmarkQueries(b *testing.B) { qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples) require.NoError(b, err) - qOOOHead, err := NewBlockQuerier(NewOOORangeHead(head, 1, nSamples), 1, nSamples) + qOOOHead, err := NewBlockQuerier(NewOOORangeHead(head, 1, nSamples, 0), 1, nSamples) require.NoError(b, err) queryTypes = append(queryTypes, qt{