From 5585a3c7e5e382cd6f81901cc24135b0e35c640e Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 25 Jun 2024 00:47:06 -0700 Subject: [PATCH] tsdb: expose hook to customize block querier (#14114) * expose hook for block querier Signed-off-by: Ben Ye * update comment Signed-off-by: Ben Ye * use defined type Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- tsdb/db.go | 54 ++++++++++++++++++++++++++--------- tsdb/db_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 13 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index c44737c69..61990a12f 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -192,12 +192,22 @@ type Options struct { // NewCompactorFunc is a function that returns a TSDB compactor. NewCompactorFunc NewCompactorFunc + + // BlockQuerierFunc is a function to return storage.Querier from a BlockReader. + BlockQuerierFunc BlockQuerierFunc + + // BlockChunkQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader. + BlockChunkQuerierFunc BlockChunkQuerierFunc } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} +type BlockQuerierFunc func(b BlockReader, mint, maxt int64) (storage.Querier, error) + +type BlockChunkQuerierFunc func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) + // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { @@ -244,6 +254,10 @@ type DB struct { writeNotified wlog.WriteNotified registerer prometheus.Registerer + + blockQuerierFunc BlockQuerierFunc + + blockChunkQuerierFunc BlockChunkQuerierFunc } type dbMetrics struct { @@ -559,10 +573,12 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue db.closers = append(db.closers, head) return &DB{ - dir: db.dir, - logger: db.logger, - blocks: blocks, - head: head, + dir: db.dir, + logger: db.logger, + blocks: blocks, + head: head, + blockQuerierFunc: NewBlockQuerier, + blockChunkQuerierFunc: NewBlockChunkQuerier, }, nil } @@ -870,6 +886,18 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } db.compactCancel = cancel + if opts.BlockQuerierFunc == nil { + db.blockQuerierFunc = NewBlockQuerier + } else { + db.blockQuerierFunc = opts.BlockQuerierFunc + } + + if opts.BlockChunkQuerierFunc == nil { + db.blockChunkQuerierFunc = NewBlockChunkQuerier + } else { + db.blockChunkQuerierFunc = opts.BlockChunkQuerierFunc + } + var wal, wbl *wlog.WL segmentSize := wlog.DefaultSegmentSize // Wal is enabled. @@ -1964,7 +1992,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) var err error - inOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt) + inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head %s: %w", rh, err) } @@ -1981,7 +2009,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = NewBlockQuerier(rh, newMint, maxt) + inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err) } @@ -1995,9 +2023,9 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) var err error - outOfOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt) + outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) if err != nil { - // If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead. + // If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead. rh.isoState.Close() return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err) @@ -2007,7 +2035,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } for _, b := range blocks { - q, err := NewBlockQuerier(b, mint, maxt) + q, err := db.blockQuerierFunc(b, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for block %s: %w", b, err) } @@ -2045,7 +2073,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) - inOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt) + inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head %s: %w", rh, err) } @@ -2062,7 +2090,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = NewBlockChunkQuerier(rh, newMint, maxt) + inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err) } @@ -2075,7 +2103,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) - outOfOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt) + outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err) } @@ -2084,7 +2112,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } for _, b := range blocks { - q, err := NewBlockChunkQuerier(b, mint, maxt) + q, err := db.blockChunkQuerierFunc(b, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for block %s: %w", b, err) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 3d2fb2d99..1fb6d30d6 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -7159,3 +7159,78 @@ func TestNewCompactorFunc(t *testing.T) { require.Len(t, ulids, 1) require.Equal(t, block2, ulids[0]) } + +func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) { + opts := DefaultOptions() + opts.BlockQuerierFunc = func(b BlockReader, mint, maxt int64) (storage.Querier, error) { + // Only block with hints can be queried. + if len(b.Meta().Compaction.Hints) > 0 { + return NewBlockQuerier(b, mint, maxt) + } + return storage.NoopQuerier(), nil + } + opts.BlockChunkQuerierFunc = func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + // Only level 4 compaction block can be queried. + if b.Meta().Compaction.Level == 4 { + return NewBlockChunkQuerier(b, mint, maxt) + } + return storage.NoopChunkedQuerier(), nil + } + + db := openTestDB(t, opts, nil) + defer func() { + require.NoError(t, db.Close()) + }() + + metas := []BlockMeta{ + {Compaction: BlockMetaCompaction{Hints: []string{"test-hint"}}}, + {Compaction: BlockMetaCompaction{Level: 4}}, + } + for i := range metas { + // Include blockID into series to identify which block got touched. + serieses := []storage.Series{storage.NewListSeries(labels.FromMap(map[string]string{"block": fmt.Sprintf("block-%d", i), labels.MetricName: "test_metric"}), []chunks.Sample{sample{t: 0, f: 1}})} + blockDir := createBlock(t, db.Dir(), serieses) + b, err := OpenBlock(db.logger, blockDir, db.chunkPool) + require.NoError(t, err) + + // Overwrite meta.json with compaction section for testing purpose. + b.meta.Compaction = metas[i].Compaction + _, err = writeMetaFile(db.logger, blockDir, &b.meta) + require.NoError(t, err) + require.NoError(t, b.Close()) + } + require.NoError(t, db.reloadBlocks()) + require.Len(t, db.Blocks(), 2) + + querier, err := db.Querier(0, 500) + require.NoError(t, err) + defer querier.Close() + matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric") + seriesSet := querier.Select(context.Background(), false, nil, matcher) + count := 0 + var lbls labels.Labels + for seriesSet.Next() { + count++ + lbls = seriesSet.At().Labels() + } + require.NoError(t, seriesSet.Err()) + require.Equal(t, 1, count) + // Make sure only block-0 is queried. + require.Equal(t, "block-0", lbls.Get("block")) + + chunkQuerier, err := db.ChunkQuerier(0, 500) + require.NoError(t, err) + defer chunkQuerier.Close() + css := chunkQuerier.Select(context.Background(), false, nil, matcher) + count = 0 + // Reset lbls variable. + lbls = labels.EmptyLabels() + for css.Next() { + count++ + lbls = css.At().Labels() + } + require.NoError(t, css.Err()) + require.Equal(t, 1, count) + // Make sure only block-1 is queried. + require.Equal(t, "block-1", lbls.Get("block")) +}