Add context argument to IndexReader.Postings (#12667)

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen 2023-09-13 17:45:06 +02:00 committed by GitHub
parent 6ef9ed0bc3
commit 4451ba10b4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 139 additions and 99 deletions

View file

@ -372,7 +372,7 @@ func main() {
os.Exit(checkErr(benchmarkWrite(*benchWriteOutPath, *benchSamplesFile, *benchWriteNumMetrics, *benchWriteNumScrapes))) os.Exit(checkErr(benchmarkWrite(*benchWriteOutPath, *benchSamplesFile, *benchWriteNumMetrics, *benchWriteNumScrapes)))
case tsdbAnalyzeCmd.FullCommand(): case tsdbAnalyzeCmd.FullCommand():
os.Exit(checkErr(analyzeBlock(*analyzePath, *analyzeBlockID, *analyzeLimit, *analyzeRunExtended))) os.Exit(checkErr(analyzeBlock(ctx, *analyzePath, *analyzeBlockID, *analyzeLimit, *analyzeRunExtended)))
case tsdbListCmd.FullCommand(): case tsdbListCmd.FullCommand():
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable))) os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))

View file

@ -413,7 +413,7 @@ func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error)
return db, b, nil return db, b, nil
} }
func analyzeBlock(path, blockID string, limit int, runExtended bool) error { func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExtended bool) error {
db, block, err := openBlock(path, blockID) db, block, err := openBlock(path, blockID)
if err != nil { if err != nil {
return err return err
@ -460,7 +460,7 @@ func analyzeBlock(path, blockID string, limit int, runExtended bool) error {
labelpairsUncovered := map[string]uint64{} labelpairsUncovered := map[string]uint64{}
labelpairsCount := map[string]uint64{} labelpairsCount := map[string]uint64{}
entries := 0 entries := 0
p, err := ir.Postings("", "") // The special all key. p, err := ir.Postings(ctx, "", "") // The special all key.
if err != nil { if err != nil {
return err return err
} }
@ -543,7 +543,7 @@ func analyzeBlock(path, blockID string, limit int, runExtended bool) error {
return err return err
} }
for _, n := range lv { for _, n := range lv {
postings, err := ir.Postings("__name__", n) postings, err := ir.Postings(ctx, "__name__", n)
if err != nil { if err != nil {
return err return err
} }
@ -560,14 +560,15 @@ func analyzeBlock(path, blockID string, limit int, runExtended bool) error {
printInfo(postingInfos) printInfo(postingInfos)
if runExtended { if runExtended {
return analyzeCompaction(block, ir) return analyzeCompaction(ctx, block, ir)
} }
return nil return nil
} }
func analyzeCompaction(block tsdb.BlockReader, indexr tsdb.IndexReader) (err error) { func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.IndexReader) (err error) {
postingsr, err := indexr.Postings(index.AllPostingsKey()) n, v := index.AllPostingsKey()
postingsr, err := indexr.Postings(ctx, n, v)
if err != nil { if err != nil {
return err return err
} }

View file

@ -28,6 +28,8 @@ import (
) )
func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval, numIntervals int) error { func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval, numIntervals int) error {
ctx := context.Background()
metrics := []labels.Labels{} metrics := []labels.Labels{}
metrics = append(metrics, labels.FromStrings("__name__", "a_one")) metrics = append(metrics, labels.FromStrings("__name__", "a_one"))
metrics = append(metrics, labels.FromStrings("__name__", "b_one")) metrics = append(metrics, labels.FromStrings("__name__", "b_one"))
@ -67,7 +69,7 @@ func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval,
} }
} }
stor.DB.ForceHeadMMap() // Ensure we have at most one head chunk for every series. stor.DB.ForceHeadMMap() // Ensure we have at most one head chunk for every series.
stor.DB.Compact() stor.DB.Compact(ctx)
return nil return nil
} }

View file

@ -74,7 +74,7 @@ type IndexReader interface {
// The Postings here contain the offsets to the series inside the index. // The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g. // Found IDs are not strictly required to point to a valid Series, e.g.
// during background garbage collections. // during background garbage collections.
Postings(name string, values ...string) (index.Postings, error) Postings(ctx context.Context, name string, values ...string) (index.Postings, error)
// SortedPostings returns a postings list that is reordered to be sorted // SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series. // by the label set of the underlying series.
@ -488,8 +488,8 @@ func (r blockIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, err
return labelNamesWithMatchers(r.ir, matchers...) return labelNamesWithMatchers(r.ir, matchers...)
} }
func (r blockIndexReader) Postings(name string, values ...string) (index.Postings, error) { func (r blockIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
p, err := r.ir.Postings(name, values...) p, err := r.ir.Postings(ctx, name, values...)
if err != nil { if err != nil {
return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
} }

View file

@ -731,7 +731,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
closers = append(closers, tombsr) closers = append(closers, tombsr)
k, v := index.AllPostingsKey() k, v := index.AllPostingsKey()
all, err := indexr.Postings(k, v) all, err := indexr.Postings(ctx, k, v)
if err != nil { if err != nil {
return err return err
} }

View file

@ -1268,6 +1268,8 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
for title, bootStrap := range tests { for title, bootStrap := range tests {
t.Run(title, func(t *testing.T) { t.Run(title, func(t *testing.T) {
ctx := context.Background()
db := openTestDB(t, nil, []int64{1, 100}) db := openTestDB(t, nil, []int64{1, 100})
defer func() { defer func() {
require.NoError(t, db.Close()) require.NoError(t, db.Close())
@ -1291,7 +1293,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
// Do the compaction and check the metrics. // Do the compaction and check the metrics.
// Compaction should succeed, but the reloadBlocks should fail and // Compaction should succeed, but the reloadBlocks should fail and
// the new block created from the compaction should be deleted. // the new block created from the compaction should be deleted.
require.Error(t, db.Compact()) require.Error(t, db.Compact(ctx))
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reloadBlocks' count metrics mismatch") require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reloadBlocks' count metrics mismatch")
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "`compaction` count metric mismatch") require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran), "`compaction` count metric mismatch")
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "`compactions failed` count metric mismatch") require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "`compactions failed` count metric mismatch")

View file

@ -908,7 +908,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
db.oooWasEnabled.Store(true) db.oooWasEnabled.Store(true)
} }
go db.run() go db.run(ctx)
return db, nil return db, nil
} }
@ -949,7 +949,7 @@ func (db *DB) Dir() string {
return db.dir return db.dir
} }
func (db *DB) run() { func (db *DB) run(ctx context.Context) {
defer close(db.donec) defer close(db.donec)
backoff := time.Duration(0) backoff := time.Duration(0)
@ -980,7 +980,7 @@ func (db *DB) run() {
db.autoCompactMtx.Lock() db.autoCompactMtx.Lock()
if db.autoCompact { if db.autoCompact {
if err := db.Compact(); err != nil { if err := db.Compact(ctx); err != nil {
level.Error(db.logger).Log("msg", "compaction failed", "err", err) level.Error(db.logger).Log("msg", "compaction failed", "err", err)
backoff = exponential(backoff, 1*time.Second, 1*time.Minute) backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else { } else {
@ -1100,7 +1100,7 @@ func (a dbAppender) Commit() error {
// which will also delete the blocks that fall out of the retention window. // which will also delete the blocks that fall out of the retention window.
// Old blocks are only deleted on reloadBlocks based on the new block's parent information. // Old blocks are only deleted on reloadBlocks based on the new block's parent information.
// See DB.reloadBlocks documentation for further information. // See DB.reloadBlocks documentation for further information.
func (db *DB) Compact() (returnErr error) { func (db *DB) Compact(ctx context.Context) (returnErr error) {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
defer func() { defer func() {
@ -1173,7 +1173,7 @@ func (db *DB) Compact() (returnErr error) {
if lastBlockMaxt != math.MinInt64 { if lastBlockMaxt != math.MinInt64 {
// The head was compacted, so we compact OOO head as well. // The head was compacted, so we compact OOO head as well.
if err := db.compactOOOHead(); err != nil { if err := db.compactOOOHead(ctx); err != nil {
return errors.Wrap(err, "compact ooo head") return errors.Wrap(err, "compact ooo head")
} }
} }
@ -1197,18 +1197,18 @@ func (db *DB) CompactHead(head *RangeHead) error {
} }
// CompactOOOHead compacts the OOO Head. // CompactOOOHead compacts the OOO Head.
func (db *DB) CompactOOOHead() error { func (db *DB) CompactOOOHead(ctx context.Context) error {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
return db.compactOOOHead() return db.compactOOOHead(ctx)
} }
func (db *DB) compactOOOHead() error { func (db *DB) compactOOOHead(ctx context.Context) error {
if !db.oooWasEnabled.Load() { if !db.oooWasEnabled.Load() {
return nil return nil
} }
oooHead, err := NewOOOCompactionHead(db.head) oooHead, err := NewOOOCompactionHead(ctx, db.head)
if err != nil { if err != nil {
return errors.Wrap(err, "get ooo compaction head") return errors.Wrap(err, "get ooo compaction head")
} }

View file

@ -1207,7 +1207,7 @@ func TestTombstoneClean(t *testing.T) {
defer db.Close() defer db.Close()
for _, r := range c.intervals { for _, r := range c.intervals {
require.NoError(t, db.Delete(context.Background(), r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
} }
// All of the setup for THIS line. // All of the setup for THIS line.
@ -1864,7 +1864,7 @@ func TestChunkAtBlockBoundary(t *testing.T) {
err := app.Commit() err := app.Commit()
require.NoError(t, err) require.NoError(t, err)
err = db.Compact() err = db.Compact(ctx)
require.NoError(t, err) require.NoError(t, err)
var builder labels.ScratchBuilder var builder labels.ScratchBuilder
@ -1877,7 +1877,7 @@ func TestChunkAtBlockBoundary(t *testing.T) {
meta := block.Meta() meta := block.Meta()
k, v := index.AllPostingsKey() k, v := index.AllPostingsKey()
p, err := r.Postings(k, v) p, err := r.Postings(ctx, k, v)
require.NoError(t, err) require.NoError(t, err)
var chks []chunks.Meta var chks []chunks.Meta
@ -1920,7 +1920,7 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
err := app.Commit() err := app.Commit()
require.NoError(t, err) require.NoError(t, err)
err = db.Compact() err = db.Compact(ctx)
require.NoError(t, err) require.NoError(t, err)
require.GreaterOrEqual(t, len(db.blocks), 3, "invalid test, less than three blocks in DB") require.GreaterOrEqual(t, len(db.blocks), 3, "invalid test, less than three blocks in DB")
@ -2051,7 +2051,7 @@ func TestNoEmptyBlocks(t *testing.T) {
defaultMatcher := labels.MustNewMatcher(labels.MatchRegexp, "", ".*") defaultMatcher := labels.MustNewMatcher(labels.MatchRegexp, "", ".*")
t.Run("Test no blocks after compact with empty head.", func(t *testing.T) { t.Run("Test no blocks after compact with empty head.", func(t *testing.T) {
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
actBlocks, err := blockDirs(db.Dir()) actBlocks, err := blockDirs(db.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(db.Blocks()), len(actBlocks)) require.Equal(t, len(db.Blocks()), len(actBlocks))
@ -2069,7 +2069,7 @@ func TestNoEmptyBlocks(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.NoError(t, db.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher)) require.NoError(t, db.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher))
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here") require.Equal(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here")
actBlocks, err := blockDirs(db.Dir()) actBlocks, err := blockDirs(db.Dir())
@ -2091,7 +2091,7 @@ func TestNoEmptyBlocks(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here") require.Equal(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here")
actBlocks, err = blockDirs(db.Dir()) actBlocks, err = blockDirs(db.Dir())
require.NoError(t, err) require.NoError(t, err)
@ -2112,7 +2112,7 @@ func TestNoEmptyBlocks(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.NoError(t, db.head.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher)) require.NoError(t, db.head.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher))
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here") require.Equal(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here")
require.Equal(t, oldBlocks, db.Blocks()) require.Equal(t, oldBlocks, db.Blocks())
}) })
@ -2131,7 +2131,7 @@ func TestNoEmptyBlocks(t *testing.T) {
require.NoError(t, db.reloadBlocks()) // Reload the db to register the new blocks. require.NoError(t, db.reloadBlocks()) // Reload the db to register the new blocks.
require.Equal(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. require.Equal(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered.
require.NoError(t, db.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher)) require.NoError(t, db.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher))
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here once for each block that have tombstones") require.Equal(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here once for each block that have tombstones")
actBlocks, err := blockDirs(db.Dir()) actBlocks, err := blockDirs(db.Dir())
@ -2198,6 +2198,7 @@ func TestDB_LabelNames(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
for _, tst := range tests { for _, tst := range tests {
ctx := context.Background()
db := openTestDB(t, nil, nil) db := openTestDB(t, nil, nil)
defer func() { defer func() {
require.NoError(t, db.Close()) require.NoError(t, db.Close())
@ -2214,7 +2215,7 @@ func TestDB_LabelNames(t *testing.T) {
require.NoError(t, headIndexr.Close()) require.NoError(t, headIndexr.Close())
// Testing disk. // Testing disk.
err = db.Compact() err = db.Compact(ctx)
require.NoError(t, err) require.NoError(t, err)
// All blocks have same label names, hence check them individually. // All blocks have same label names, hence check them individually.
// No need to aggregate and check. // No need to aggregate and check.
@ -2264,7 +2265,7 @@ func TestCorrectNumTombstones(t *testing.T) {
} }
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
err := db.Compact() err := db.Compact(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, len(db.blocks)) require.Equal(t, 1, len(db.blocks))
@ -3033,12 +3034,14 @@ func TestCompactHeadWithDeletion(t *testing.T) {
db, err := Open(t.TempDir(), log.NewNopLogger(), prometheus.NewRegistry(), nil, nil) db, err := Open(t.TempDir(), log.NewNopLogger(), prometheus.NewRegistry(), nil, nil)
require.NoError(t, err) require.NoError(t, err)
app := db.Appender(context.Background()) ctx := context.Background()
app := db.Appender(ctx)
_, err = app.Append(0, labels.FromStrings("a", "b"), 10, rand.Float64()) _, err = app.Append(0, labels.FromStrings("a", "b"), 10, rand.Float64())
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
err = db.Delete(context.Background(), 0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) err = db.Delete(ctx, 0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.NoError(t, err) require.NoError(t, err)
// This recreates the bug. // This recreates the bug.
@ -3197,6 +3200,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
} }
tmpDir := t.TempDir() tmpDir := t.TempDir()
ctx := context.Background()
db, err := Open(tmpDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil) db, err := Open(tmpDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil)
require.NoError(t, err) require.NoError(t, err)
@ -3228,7 +3232,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
require.Equal(t, 60, last) require.Equal(t, 60, last)
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal))
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal))
// As the data spans for 59 blocks, 58 go to disk and 1 remains in Head. // As the data spans for 59 blocks, 58 go to disk and 1 remains in Head.
@ -3286,7 +3290,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
require.Equal(t, 62, last) require.Equal(t, 62, last)
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal))
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal))
// No new blocks should be created as there was not data in between the new samples and the blocks. // No new blocks should be created as there was not data in between the new samples and the blocks.
@ -3385,7 +3389,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t
} }
// Compact the TSDB head for the first time. We expect the head chunks file has been cut. // Compact the TSDB head for the first time. We expect the head chunks file has been cut.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal))
// Push more samples for another 1x block duration period. // Push more samples for another 1x block duration period.
@ -3430,7 +3434,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t
require.Equal(t, actualSeries, numSeries) require.Equal(t, actualSeries, numSeries)
// Compact the TSDB head again. // Compact the TSDB head again.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, float64(2), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) require.Equal(t, float64(2), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal))
// At this point we expect 1 head chunk has been deleted. // At this point we expect 1 head chunk has been deleted.
@ -3521,7 +3525,7 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun
} }
// Compact the TSDB head for the first time. We expect the head chunks file has been cut. // Compact the TSDB head for the first time. We expect the head chunks file has been cut.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal))
// Push more samples for another 1x block duration period. // Push more samples for another 1x block duration period.
@ -3564,7 +3568,7 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun
require.Equal(t, actualSeries, numSeries) require.Equal(t, actualSeries, numSeries)
// Compact the TSDB head again. // Compact the TSDB head again.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, float64(2), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) require.Equal(t, float64(2), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal))
// At this point we expect 1 head chunk has been deleted. // At this point we expect 1 head chunk has been deleted.
@ -3796,6 +3800,7 @@ func TestOOOWALWrite(t *testing.T) {
// Tests https://github.com/prometheus/prometheus/issues/10291#issuecomment-1044373110. // Tests https://github.com/prometheus/prometheus/issues/10291#issuecomment-1044373110.
func TestDBPanicOnMmappingHeadChunk(t *testing.T) { func TestDBPanicOnMmappingHeadChunk(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
db, err := Open(dir, nil, nil, DefaultOptions(), nil) db, err := Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err) require.NoError(t, err)
@ -3826,7 +3831,7 @@ func TestDBPanicOnMmappingHeadChunk(t *testing.T) {
addSamples(numSamples) addSamples(numSamples)
require.Len(t, db.Blocks(), 0) require.Len(t, db.Blocks(), 0)
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Len(t, db.Blocks(), 0) require.Len(t, db.Blocks(), 0)
// Restarting. // Restarting.
@ -3841,7 +3846,7 @@ func TestDBPanicOnMmappingHeadChunk(t *testing.T) {
addSamples(numSamples) addSamples(numSamples)
require.Len(t, db.Blocks(), 0) require.Len(t, db.Blocks(), 0)
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Len(t, db.Blocks(), 1) require.Len(t, db.Blocks(), 1)
// More samples to m-map and panic. // More samples to m-map and panic.
@ -4107,6 +4112,7 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
// are not included in this compaction. // are not included in this compaction.
func TestOOOCompaction(t *testing.T) { func TestOOOCompaction(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
opts := DefaultOptions() opts := DefaultOptions()
opts.OutOfOrderCapMax = 30 opts.OutOfOrderCapMax = 30
@ -4204,7 +4210,7 @@ func TestOOOCompaction(t *testing.T) {
require.Greater(t, f.Size(), int64(100)) require.Greater(t, f.Size(), int64(100))
// OOO compaction happens here. // OOO compaction happens here.
require.NoError(t, db.CompactOOOHead()) require.NoError(t, db.CompactOOOHead(ctx))
// 3 blocks exist now. [0, 120), [120, 240), [240, 360) // 3 blocks exist now. [0, 120), [120, 240), [240, 360)
require.Equal(t, len(db.Blocks()), 3) require.Equal(t, len(db.Blocks()), 3)
@ -4272,7 +4278,7 @@ func TestOOOCompaction(t *testing.T) {
require.Equal(t, "000001", files[0].Name()) require.Equal(t, "000001", files[0].Name())
// This will merge overlapping block. // This will merge overlapping block.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, len(db.Blocks()), 3) // [0, 120), [120, 240), [240, 360) require.Equal(t, len(db.Blocks()), 3) // [0, 120), [120, 240), [240, 360)
verifySamples(db.Blocks()[0], 90, 119) verifySamples(db.Blocks()[0], 90, 119)
@ -4286,6 +4292,7 @@ func TestOOOCompaction(t *testing.T) {
// when the normal head's compaction is done. // when the normal head's compaction is done.
func TestOOOCompactionWithNormalCompaction(t *testing.T) { func TestOOOCompactionWithNormalCompaction(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
opts := DefaultOptions() opts := DefaultOptions()
opts.OutOfOrderCapMax = 30 opts.OutOfOrderCapMax = 30
@ -4328,7 +4335,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
} }
// If the normal Head is not compacted, the OOO head compaction does not take place. // If the normal Head is not compacted, the OOO head compaction does not take place.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, len(db.Blocks()), 0) require.Equal(t, len(db.Blocks()), 0)
// Add more in-order samples in future that would trigger the compaction. // Add more in-order samples in future that would trigger the compaction.
@ -4338,7 +4345,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
require.Equal(t, len(db.Blocks()), 0) require.Equal(t, len(db.Blocks()), 0)
// Compacts normal and OOO head. // Compacts normal and OOO head.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
// 2 blocks exist now. [0, 120), [250, 360) // 2 blocks exist now. [0, 120), [250, 360)
require.Equal(t, len(db.Blocks()), 2) require.Equal(t, len(db.Blocks()), 2)
@ -4385,6 +4392,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
// and out-of-order head // and out-of-order head
func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
opts := DefaultOptions() opts := DefaultOptions()
opts.OutOfOrderCapMax = 30 opts.OutOfOrderCapMax = 30
@ -4428,7 +4436,7 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
} }
// If the normal Head is not compacted, the OOO head compaction does not take place. // If the normal Head is not compacted, the OOO head compaction does not take place.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Equal(t, len(db.Blocks()), 0) require.Equal(t, len(db.Blocks()), 0)
// Add more in-order samples in future that would trigger the compaction. // Add more in-order samples in future that would trigger the compaction.
@ -4438,7 +4446,7 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
require.Equal(t, len(db.Blocks()), 0) require.Equal(t, len(db.Blocks()), 0)
// Compacts normal and OOO head. // Compacts normal and OOO head.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
// 2 blocks exist now. [0, 120), [250, 360) // 2 blocks exist now. [0, 120), [250, 360)
require.Equal(t, len(db.Blocks()), 2) require.Equal(t, len(db.Blocks()), 2)
@ -4485,6 +4493,7 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
// data from the mmap chunks. // data from the mmap chunks.
func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) { func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
opts := DefaultOptions() opts := DefaultOptions()
opts.OutOfOrderCapMax = 10 opts.OutOfOrderCapMax = 10
@ -4573,7 +4582,7 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
// Compaction should also work fine. // Compaction should also work fine.
require.Equal(t, len(db.Blocks()), 0) require.Equal(t, len(db.Blocks()), 0)
require.NoError(t, db.CompactOOOHead()) require.NoError(t, db.CompactOOOHead(ctx))
require.Equal(t, len(db.Blocks()), 1) // One block from OOO data. require.Equal(t, len(db.Blocks()), 1) // One block from OOO data.
require.Equal(t, int64(0), db.Blocks()[0].MinTime()) require.Equal(t, int64(0), db.Blocks()[0].MinTime())
require.Equal(t, 120*time.Minute.Milliseconds(), db.Blocks()[0].MaxTime()) require.Equal(t, 120*time.Minute.Milliseconds(), db.Blocks()[0].MaxTime())
@ -5144,6 +5153,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
func TestOOOCompactionFailure(t *testing.T) { func TestOOOCompactionFailure(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
opts := DefaultOptions() opts := DefaultOptions()
opts.OutOfOrderCapMax = 30 opts.OutOfOrderCapMax = 30
@ -5206,7 +5216,7 @@ func TestOOOCompactionFailure(t *testing.T) {
originalCompactor := db.compactor originalCompactor := db.compactor
db.compactor = &mockCompactorFailing{t: t} db.compactor = &mockCompactorFailing{t: t}
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
require.Error(t, db.CompactOOOHead()) require.Error(t, db.CompactOOOHead(ctx))
} }
require.Equal(t, len(db.Blocks()), 0) require.Equal(t, len(db.Blocks()), 0)
@ -5217,7 +5227,7 @@ func TestOOOCompactionFailure(t *testing.T) {
verifyFirstWBLFileIs0(6) verifyFirstWBLFileIs0(6)
db.compactor = originalCompactor db.compactor = originalCompactor
require.NoError(t, db.CompactOOOHead()) require.NoError(t, db.CompactOOOHead(ctx))
oldBlocks := db.Blocks() oldBlocks := db.Blocks()
require.Equal(t, len(db.Blocks()), 3) require.Equal(t, len(db.Blocks()), 3)
@ -5229,7 +5239,7 @@ func TestOOOCompactionFailure(t *testing.T) {
// The failed compaction should not have left the ooo Head corrupted. // The failed compaction should not have left the ooo Head corrupted.
// Hence, expect no new blocks with another OOO compaction call. // Hence, expect no new blocks with another OOO compaction call.
require.NoError(t, db.CompactOOOHead()) require.NoError(t, db.CompactOOOHead(ctx))
require.Equal(t, len(db.Blocks()), 3) require.Equal(t, len(db.Blocks()), 3)
require.Equal(t, oldBlocks, db.Blocks()) require.Equal(t, oldBlocks, db.Blocks())
@ -5550,6 +5560,8 @@ func TestOOOMmapCorruption(t *testing.T) {
} }
func TestOutOfOrderRuntimeConfig(t *testing.T) { func TestOutOfOrderRuntimeConfig(t *testing.T) {
ctx := context.Background()
getDB := func(oooTimeWindow int64) *DB { getDB := func(oooTimeWindow int64) *DB {
dir := t.TempDir() dir := t.TempDir()
@ -5616,7 +5628,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
require.Greater(t, size, int64(0)) require.Greater(t, size, int64(0))
require.Len(t, db.Blocks(), 0) require.Len(t, db.Blocks(), 0)
require.NoError(t, db.compactOOOHead()) require.NoError(t, db.compactOOOHead(ctx))
require.Greater(t, len(db.Blocks()), 0) require.Greater(t, len(db.Blocks()), 0)
// WBL is empty. // WBL is empty.
@ -5836,6 +5848,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) {
for i, c := range cases { for i, c := range cases {
t.Run(fmt.Sprintf("case=%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case=%d", i), func(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
opts := DefaultOptions() opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 30 * time.Minute.Milliseconds() opts.OutOfOrderTimeWindow = 30 * time.Minute.Milliseconds()
@ -5856,7 +5869,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) {
verifySamples(t, db, c.inOrderMint, c.inOrderMaxt) verifySamples(t, db, c.inOrderMint, c.inOrderMaxt)
// We get 2 blocks. 1 from OOO, 1 from in-order. // We get 2 blocks. 1 from OOO, 1 from in-order.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
verifyBlockRanges := func() { verifyBlockRanges := func() {
blocks := db.Blocks() blocks := db.Blocks()
require.Equal(t, len(c.blockRanges), len(blocks)) require.Equal(t, len(c.blockRanges), len(blocks))
@ -5993,6 +6006,7 @@ func TestPanicOnApplyConfig(t *testing.T) {
func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
opts := DefaultOptions() opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds() opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds()
@ -6057,14 +6071,14 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
db.head.mmapHeadChunks() db.head.mmapHeadChunks()
checkMmapFileContents([]string{"000001", "000002"}, nil) checkMmapFileContents([]string{"000001", "000002"}, nil)
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
checkMmapFileContents([]string{"000002"}, []string{"000001"}) checkMmapFileContents([]string{"000002"}, []string{"000001"})
require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted") require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted")
addSamples(501, 650) addSamples(501, 650)
db.head.mmapHeadChunks() db.head.mmapHeadChunks()
checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"}) checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"})
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
checkMmapFileContents(nil, []string{"000001", "000002", "000003"}) checkMmapFileContents(nil, []string{"000001", "000002", "000003"})
// Verify that WBL is empty. // Verify that WBL is empty.

View file

@ -103,7 +103,7 @@ func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, err
} }
// Postings returns the postings list iterator for the label pairs. // Postings returns the postings list iterator for the label pairs.
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { func (h *headIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
switch len(values) { switch len(values) {
case 0: case 0:
return index.EmptyPostings(), nil return index.EmptyPostings(), nil
@ -116,7 +116,7 @@ func (h *headIndexReader) Postings(name string, values ...string) (index.Posting
res = append(res, p) res = append(res, p)
} }
} }
return index.Merge(res...), nil return index.Merge(ctx, res...), nil
} }
} }

View file

@ -2546,7 +2546,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.Equal(t, int64(math.MinInt64), db.head.minValidTime.Load()) require.Equal(t, int64(math.MinInt64), db.head.minValidTime.Load())
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Greater(t, db.head.minValidTime.Load(), int64(0)) require.Greater(t, db.head.minValidTime.Load(), int64(0))
app = db.Appender(ctx) app = db.Appender(ctx)
@ -2997,6 +2997,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
func TestChunkNotFoundHeadGCRace(t *testing.T) { func TestChunkNotFoundHeadGCRace(t *testing.T) {
db := newTestDB(t) db := newTestDB(t)
db.DisableCompactions() db.DisableCompactions()
ctx := context.Background()
var ( var (
app = db.Appender(context.Background()) app = db.Appender(context.Background())
@ -3029,7 +3030,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
// Compacting head while the querier spans the compaction time. // Compacting head while the querier spans the compaction time.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Greater(t, len(db.Blocks()), 0) require.Greater(t, len(db.Blocks()), 0)
}() }()
@ -3062,6 +3063,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) {
func TestDataMissingOnQueryDuringCompaction(t *testing.T) { func TestDataMissingOnQueryDuringCompaction(t *testing.T) {
db := newTestDB(t) db := newTestDB(t)
db.DisableCompactions() db.DisableCompactions()
ctx := context.Background()
var ( var (
app = db.Appender(context.Background()) app = db.Appender(context.Background())
@ -3091,7 +3093,7 @@ func TestDataMissingOnQueryDuringCompaction(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
// Compacting head while the querier spans the compaction time. // Compacting head while the querier spans the compaction time.
require.NoError(t, db.Compact()) require.NoError(t, db.Compact(ctx))
require.Greater(t, len(db.Blocks()), 0) require.Greater(t, len(db.Blocks()), 0)
}() }()
@ -5332,6 +5334,7 @@ func BenchmarkCuttingHeadHistogramChunks(b *testing.B) {
} }
func TestCuttingNewHeadChunks(t *testing.T) { func TestCuttingNewHeadChunks(t *testing.T) {
ctx := context.Background()
testCases := map[string]struct { testCases := map[string]struct {
numTotalSamples int numTotalSamples int
timestampJitter bool timestampJitter bool
@ -5465,7 +5468,7 @@ func TestCuttingNewHeadChunks(t *testing.T) {
chkReader, err := h.Chunks() chkReader, err := h.Chunks()
require.NoError(t, err) require.NoError(t, err)
p, err := idxReader.Postings("foo", "bar") p, err := idxReader.Postings(ctx, "foo", "bar")
require.NoError(t, err) require.NoError(t, err)
var lblBuilder labels.ScratchBuilder var lblBuilder labels.ScratchBuilder

View file

@ -1605,7 +1605,7 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch
return errors.Wrap(r.dec.Series(d.Get(), builder, chks), "read series") return errors.Wrap(r.dec.Series(d.Get(), builder, chks), "read series")
} }
func (r *Reader) Postings(name string, values ...string) (Postings, error) { func (r *Reader) Postings(ctx context.Context, name string, values ...string) (Postings, error) {
if r.version == FormatV1 { if r.version == FormatV1 {
e, ok := r.postingsV1[name] e, ok := r.postingsV1[name]
if !ok { if !ok {
@ -1625,7 +1625,7 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) {
} }
res = append(res, p) res = append(res, p)
} }
return Merge(res...), nil return Merge(ctx, res...), nil
} }
e, ok := r.postings[name] e, ok := r.postings[name]
@ -1664,7 +1664,7 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) {
// Iterate on the offset table. // Iterate on the offset table.
var postingsOff uint64 // The offset into the postings table. var postingsOff uint64 // The offset into the postings table.
for d.Err() == nil { for d.Err() == nil && ctx.Err() == nil {
if skip == 0 { if skip == 0 {
// These are always the same number of bytes, // These are always the same number of bytes,
// and it's faster to skip than parse. // and it's faster to skip than parse.
@ -1701,9 +1701,12 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) {
if d.Err() != nil { if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "get postings offset entry") return nil, errors.Wrap(d.Err(), "get postings offset entry")
} }
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "get postings offset entry")
}
} }
return Merge(res...), nil return Merge(ctx, res...), nil
} }
// SortedPostings returns the given postings list reordered so that the backing series // SortedPostings returns the given postings list reordered so that the backing series

View file

@ -103,13 +103,13 @@ func (m mockIndex) LabelValues(name string) ([]string, error) {
return values, nil return values, nil
} }
func (m mockIndex) Postings(name string, values ...string) (Postings, error) { func (m mockIndex) Postings(ctx context.Context, name string, values ...string) (Postings, error) {
p := []Postings{} p := []Postings{}
for _, value := range values { for _, value := range values {
l := labels.Label{Name: name, Value: value} l := labels.Label{Name: name, Value: value}
p = append(p, m.SortedPostings(NewListPostings(m.postings[l]))) p = append(p, m.SortedPostings(NewListPostings(m.postings[l])))
} }
return Merge(p...), nil return Merge(ctx, p...), nil
} }
func (m mockIndex) SortedPostings(p Postings) Postings { func (m mockIndex) SortedPostings(p Postings) Postings {
@ -162,6 +162,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
func TestIndexRW_Postings(t *testing.T) { func TestIndexRW_Postings(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
fn := filepath.Join(dir, indexFilename) fn := filepath.Join(dir, indexFilename)
@ -194,7 +195,7 @@ func TestIndexRW_Postings(t *testing.T) {
ir, err := NewFileReader(fn) ir, err := NewFileReader(fn)
require.NoError(t, err) require.NoError(t, err)
p, err := ir.Postings("a", "1") p, err := ir.Postings(ctx, "a", "1")
require.NoError(t, err) require.NoError(t, err)
var c []chunks.Meta var c []chunks.Meta
@ -245,6 +246,7 @@ func TestIndexRW_Postings(t *testing.T) {
func TestPostingsMany(t *testing.T) { func TestPostingsMany(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
fn := filepath.Join(dir, indexFilename) fn := filepath.Join(dir, indexFilename)
@ -313,7 +315,7 @@ func TestPostingsMany(t *testing.T) {
var builder labels.ScratchBuilder var builder labels.ScratchBuilder
for _, c := range cases { for _, c := range cases {
it, err := ir.Postings("i", c.in...) it, err := ir.Postings(ctx, "i", c.in...)
require.NoError(t, err) require.NoError(t, err)
got := []string{} got := []string{}
@ -335,6 +337,7 @@ func TestPostingsMany(t *testing.T) {
func TestPersistence_index_e2e(t *testing.T) { func TestPersistence_index_e2e(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
ctx := context.Background()
lbls, err := labels.ReadLabels(filepath.Join("..", "testdata", "20kseries.json"), 20000) lbls, err := labels.ReadLabels(filepath.Join("..", "testdata", "20kseries.json"), 20000)
require.NoError(t, err) require.NoError(t, err)
@ -413,10 +416,10 @@ func TestPersistence_index_e2e(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
for p := range mi.postings { for p := range mi.postings {
gotp, err := ir.Postings(p.Name, p.Value) gotp, err := ir.Postings(ctx, p.Name, p.Value)
require.NoError(t, err) require.NoError(t, err)
expp, err := mi.Postings(p.Name, p.Value) expp, err := mi.Postings(ctx, p.Name, p.Value)
require.NoError(t, err) require.NoError(t, err)
var chks, expchks []chunks.Meta var chks, expchks []chunks.Meta

View file

@ -15,6 +15,7 @@ package index
import ( import (
"container/heap" "container/heap"
"context"
"encoding/binary" "encoding/binary"
"runtime" "runtime"
"sort" "sort"
@ -519,7 +520,7 @@ func (it *intersectPostings) Err() error {
} }
// Merge returns a new iterator over the union of the input iterators. // Merge returns a new iterator over the union of the input iterators.
func Merge(its ...Postings) Postings { func Merge(ctx context.Context, its ...Postings) Postings {
if len(its) == 0 { if len(its) == 0 {
return EmptyPostings() return EmptyPostings()
} }
@ -527,7 +528,7 @@ func Merge(its ...Postings) Postings {
return its[0] return its[0]
} }
p, ok := newMergedPostings(its) p, ok := newMergedPostings(ctx, its)
if !ok { if !ok {
return EmptyPostings() return EmptyPostings()
} }
@ -559,12 +560,14 @@ type mergedPostings struct {
err error err error
} }
func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) { func newMergedPostings(ctx context.Context, p []Postings) (m *mergedPostings, nonEmpty bool) {
ph := make(postingsHeap, 0, len(p)) ph := make(postingsHeap, 0, len(p))
for _, it := range p { for _, it := range p {
// NOTE: mergedPostings struct requires the user to issue an initial Next. // NOTE: mergedPostings struct requires the user to issue an initial Next.
switch { switch {
case ctx.Err() != nil:
return &mergedPostings{err: ctx.Err()}, true
case it.Next(): case it.Next():
ph = append(ph, it) ph = append(ph, it)
case it.Err() != nil: case it.Err() != nil:

View file

@ -385,7 +385,7 @@ func TestMultiMerge(t *testing.T) {
i2 := newListPostings(2, 4, 5, 6, 7, 8, 999, 1001) i2 := newListPostings(2, 4, 5, 6, 7, 8, 999, 1001)
i3 := newListPostings(1, 2, 5, 6, 7, 8, 1001, 1200) i3 := newListPostings(1, 2, 5, 6, 7, 8, 1001, 1200)
res, err := ExpandPostings(Merge(i1, i2, i3)) res, err := ExpandPostings(Merge(context.Background(), i1, i2, i3))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200}, res) require.Equal(t, []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200}, res)
} }
@ -473,10 +473,12 @@ func TestMergedPostings(t *testing.T) {
t.Fatal("merge result expectancy cannot be nil") t.Fatal("merge result expectancy cannot be nil")
} }
ctx := context.Background()
expected, err := ExpandPostings(c.res) expected, err := ExpandPostings(c.res)
require.NoError(t, err) require.NoError(t, err)
m := Merge(c.in...) m := Merge(ctx, c.in...)
if c.res == EmptyPostings() { if c.res == EmptyPostings() {
require.Equal(t, EmptyPostings(), m) require.Equal(t, EmptyPostings(), m)
@ -537,10 +539,12 @@ func TestMergedPostingsSeek(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
ctx := context.Background()
a := newListPostings(c.a...) a := newListPostings(c.a...)
b := newListPostings(c.b...) b := newListPostings(c.b...)
p := Merge(a, b) p := Merge(ctx, a, b)
require.Equal(t, c.success, p.Seek(c.seek)) require.Equal(t, c.success, p.Seek(c.seek))
@ -796,6 +800,7 @@ func TestIntersectWithMerge(t *testing.T) {
a := newListPostings(21, 22, 23, 24, 25, 30) a := newListPostings(21, 22, 23, 24, 25, 30)
b := Merge( b := Merge(
context.Background(),
newListPostings(10, 20, 30), newListPostings(10, 20, 30),
newListPostings(15, 26, 30), newListPostings(15, 26, 30),
) )

View file

@ -15,6 +15,7 @@
package tsdb package tsdb
import ( import (
"context"
"errors" "errors"
"math" "math"
@ -190,7 +191,7 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) bool {
return a.MinTime < b.MinTime return a.MinTime < b.MinTime
} }
func (oh *OOOHeadIndexReader) Postings(name string, values ...string) (index.Postings, error) { func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
switch len(values) { switch len(values) {
case 0: case 0:
return index.EmptyPostings(), nil return index.EmptyPostings(), nil
@ -202,7 +203,7 @@ func (oh *OOOHeadIndexReader) Postings(name string, values ...string) (index.Pos
for _, value := range values { for _, value := range values {
res = append(res, oh.head.postings.Get(name, value)) // TODO(ganesh) Also call GetOOOPostings res = append(res, oh.head.postings.Get(name, value)) // TODO(ganesh) Also call GetOOOPostings
} }
return index.Merge(res...), nil return index.Merge(ctx, res...), nil
} }
} }
@ -268,7 +269,7 @@ type OOOCompactionHead struct {
// 4. Cuts a new WBL file for the OOO WBL. // 4. Cuts a new WBL file for the OOO WBL.
// All the above together have a bit of CPU and memory overhead, and can have a bit of impact // All the above together have a bit of CPU and memory overhead, and can have a bit of impact
// on the sample append latency. So call NewOOOCompactionHead only right before compaction. // on the sample append latency. So call NewOOOCompactionHead only right before compaction.
func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) { func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error) {
ch := &OOOCompactionHead{ ch := &OOOCompactionHead{
chunkRange: head.chunkRange.Load(), chunkRange: head.chunkRange.Load(),
mint: math.MaxInt64, mint: math.MaxInt64,
@ -287,7 +288,7 @@ func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) {
n, v := index.AllPostingsKey() n, v := index.AllPostingsKey()
// TODO: verify this gets only ooo samples. // TODO: verify this gets only ooo samples.
p, err := ch.oooIR.Postings(n, v) p, err := ch.oooIR.Postings(ctx, n, v)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -396,7 +397,7 @@ func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter {
return ir.ch.oooIR.Symbols() return ir.ch.oooIR.Symbols()
} }
func (ir *OOOCompactionHeadIndexReader) Postings(name string, values ...string) (index.Postings, error) { func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error) {
n, v := index.AllPostingsKey() n, v := index.AllPostingsKey()
if name != n || len(values) != 1 || values[0] != v { if name != n || len(values) != 1 || values[0] != v {
return nil, errors.New("only AllPostingsKey is supported") return nil, errors.New("only AllPostingsKey is supported")

View file

@ -266,7 +266,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
// We prefer to get AllPostings so that the base of subtraction (i.e. allPostings) // We prefer to get AllPostings so that the base of subtraction (i.e. allPostings)
// doesn't include series that may be added to the index reader during this function call. // doesn't include series that may be added to the index reader during this function call.
k, v := index.AllPostingsKey() k, v := index.AllPostingsKey()
allPostings, err := ix.Postings(k, v) allPostings, err := ix.Postings(context.TODO(), k, v)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -286,7 +286,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
switch { switch {
case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least. case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least.
k, v := index.AllPostingsKey() k, v := index.AllPostingsKey()
allPostings, err := ix.Postings(k, v) allPostings, err := ix.Postings(context.TODO(), k, v)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -363,14 +363,14 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro
// Fast-path for equal matching. // Fast-path for equal matching.
if m.Type == labels.MatchEqual { if m.Type == labels.MatchEqual {
return ix.Postings(m.Name, m.Value) return ix.Postings(context.TODO(), m.Name, m.Value)
} }
// Fast-path for set matching. // Fast-path for set matching.
if m.Type == labels.MatchRegexp { if m.Type == labels.MatchRegexp {
setMatches := findSetMatches(m.GetRegexString()) setMatches := findSetMatches(m.GetRegexString())
if len(setMatches) > 0 { if len(setMatches) > 0 {
return ix.Postings(m.Name, setMatches...) return ix.Postings(context.TODO(), m.Name, setMatches...)
} }
} }
@ -390,7 +390,7 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro
return index.EmptyPostings(), nil return index.EmptyPostings(), nil
} }
return ix.Postings(m.Name, res...) return ix.Postings(context.TODO(), m.Name, res...)
} }
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. // inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
@ -401,14 +401,14 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
if m.Type == labels.MatchNotRegexp { if m.Type == labels.MatchNotRegexp {
setMatches := findSetMatches(m.GetRegexString()) setMatches := findSetMatches(m.GetRegexString())
if len(setMatches) > 0 { if len(setMatches) > 0 {
return ix.Postings(m.Name, setMatches...) return ix.Postings(context.TODO(), m.Name, setMatches...)
} }
} }
// Fast-path for MatchNotEqual matching. // Fast-path for MatchNotEqual matching.
// Inverse of a MatchNotEqual is MatchEqual (double negation). // Inverse of a MatchNotEqual is MatchEqual (double negation).
if m.Type == labels.MatchNotEqual { if m.Type == labels.MatchNotEqual {
return ix.Postings(m.Name, m.Value) return ix.Postings(context.TODO(), m.Name, m.Value)
} }
vals, err := ix.LabelValues(m.Name) vals, err := ix.LabelValues(m.Name)
@ -428,7 +428,7 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
} }
} }
return ix.Postings(m.Name, res...) return ix.Postings(context.TODO(), m.Name, res...)
} }
func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) { func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
@ -463,7 +463,7 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat
valuesPostings := make([]index.Postings, len(allValues)) valuesPostings := make([]index.Postings, len(allValues))
for i, value := range allValues { for i, value := range allValues {
valuesPostings[i], err = r.Postings(name, value) valuesPostings[i], err = r.Postings(context.TODO(), name, value)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "fetching postings for %s=%q", name, value) return nil, errors.Wrapf(err, "fetching postings for %s=%q", name, value)
} }

View file

@ -504,6 +504,7 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) {
} }
func TestBlockQuerier_TrimmingDoesNotModifyOriginalTombstoneIntervals(t *testing.T) { func TestBlockQuerier_TrimmingDoesNotModifyOriginalTombstoneIntervals(t *testing.T) {
ctx := context.Background()
c := blockQuerierTestCase{ c := blockQuerierTestCase{
mint: 2, mint: 2,
maxt: 6, maxt: 6,
@ -527,7 +528,7 @@ func TestBlockQuerier_TrimmingDoesNotModifyOriginalTombstoneIntervals(t *testing
} }
ir, cr, _, _ := createIdxChkReaders(t, testData) ir, cr, _, _ := createIdxChkReaders(t, testData)
stones := tombstones.NewMemTombstones() stones := tombstones.NewMemTombstones()
p, err := ir.Postings("a", "a") p, err := ir.Postings(ctx, "a", "a")
require.NoError(t, err) require.NoError(t, err)
refs, err := index.ExpandPostings(p) refs, err := index.ExpandPostings(p)
require.NoError(t, err) require.NoError(t, err)
@ -1500,13 +1501,13 @@ func (m mockIndex) LabelNamesFor(ids ...storage.SeriesRef) ([]string, error) {
return names, nil return names, nil
} }
func (m mockIndex) Postings(name string, values ...string) (index.Postings, error) { func (m mockIndex) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
res := make([]index.Postings, 0, len(values)) res := make([]index.Postings, 0, len(values))
for _, value := range values { for _, value := range values {
l := labels.Label{Name: name, Value: value} l := labels.Label{Name: name, Value: value}
res = append(res, index.NewListPostings(m.postings[l])) res = append(res, index.NewListPostings(m.postings[l]))
} }
return index.Merge(res...), nil return index.Merge(ctx, res...), nil
} }
func (m mockIndex) SortedPostings(p index.Postings) index.Postings { func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
@ -2452,7 +2453,7 @@ func (m mockMatcherIndex) LabelNamesFor(ids ...storage.SeriesRef) ([]string, err
return nil, errors.New("label names for for called") return nil, errors.New("label names for for called")
} }
func (m mockMatcherIndex) Postings(name string, values ...string) (index.Postings, error) { func (m mockMatcherIndex) Postings(context.Context, string, ...string) (index.Postings, error) {
return index.EmptyPostings(), nil return index.EmptyPostings(), nil
} }

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"context"
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -28,6 +29,7 @@ import (
func TestRepairBadIndexVersion(t *testing.T) { func TestRepairBadIndexVersion(t *testing.T) {
tmpDir := t.TempDir() tmpDir := t.TempDir()
ctx := context.Background()
// The broken index used in this test was written by the following script // The broken index used in this test was written by the following script
// at a broken revision. // at a broken revision.
@ -78,7 +80,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
// Read current index to check integrity. // Read current index to check integrity.
r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename))
require.NoError(t, err) require.NoError(t, err)
p, err := r.Postings("b", "1") p, err := r.Postings(ctx, "b", "1")
require.NoError(t, err) require.NoError(t, err)
var builder labels.ScratchBuilder var builder labels.ScratchBuilder
for p.Next() { for p.Next() {
@ -97,7 +99,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename))
require.NoError(t, err) require.NoError(t, err)
defer r.Close() defer r.Close()
p, err = r.Postings("b", "1") p, err = r.Postings(ctx, "b", "1")
require.NoError(t, err) require.NoError(t, err)
res := []labels.Labels{} res := []labels.Labels{}