diff --git a/tsdb/block.go b/tsdb/block.go index 01560033cd..d1c75fc83a 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -322,11 +323,11 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { - return OpenBlockWithOptions(logger, dir, pool, nil) + return OpenBlockWithOptions(logger, dir, pool, nil, defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false) } // OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function. -func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider) (pb *Block, err error) { +func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider, postingsCacheTTL time.Duration, postingsCacheSize int, postingsCacheForce bool) (pb *Block, err error) { if logger == nil { logger = log.NewNopLogger() } @@ -351,7 +352,7 @@ func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cac if err != nil { return nil, err } - pfmc := NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false) + pfmc := NewPostingsForMatchersCache(postingsCacheTTL, postingsCacheSize, postingsCacheForce) ir := indexReaderWithPostingsForMatchers{indexReader, pfmc} closers = append(closers, ir) diff --git a/tsdb/db.go b/tsdb/db.go index a8d53a3ab6..bad66c06cf 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -72,23 +72,26 @@ var ErrNotReady = errors.New("TSDB not ready") // millisecond precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wlog.DefaultSegmentSize, - MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, - RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), - MinBlockDuration: DefaultBlockDuration, - MaxBlockDuration: DefaultBlockDuration, - NoLockfile: false, - AllowOverlappingCompaction: true, - WALCompression: false, - StripeSize: DefaultStripeSize, - HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, - IsolationDisabled: defaultIsolationDisabled, - HeadChunksEndTimeVariance: 0, - HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, - OutOfOrderCapMax: DefaultOutOfOrderCapMax, - HeadPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL, - HeadPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize, - HeadPostingsForMatchersCacheForce: false, + WALSegmentSize: wlog.DefaultSegmentSize, + MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, + RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), + MinBlockDuration: DefaultBlockDuration, + MaxBlockDuration: DefaultBlockDuration, + NoLockfile: false, + AllowOverlappingCompaction: true, + WALCompression: false, + StripeSize: DefaultStripeSize, + HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, + IsolationDisabled: defaultIsolationDisabled, + HeadChunksEndTimeVariance: 0, + HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, + OutOfOrderCapMax: DefaultOutOfOrderCapMax, + HeadPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL, + HeadPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize, + HeadPostingsForMatchersCacheForce: false, + BlockPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL, + BlockPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize, + BlockPostingsForMatchersCacheForce: false, } } @@ -200,11 +203,25 @@ type Options struct { // HeadPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache in the Head. // If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished. HeadPostingsForMatchersCacheTTL time.Duration + // HeadPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in the Head. // It's ignored when HeadPostingsForMatchersCacheTTL is 0. HeadPostingsForMatchersCacheSize int + // HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param. HeadPostingsForMatchersCacheForce bool + + // BlockPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache of each compacted block. + // If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished. + BlockPostingsForMatchersCacheTTL time.Duration + + // BlockPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in each compacted block. + // It's ignored when BlockPostingsForMatchersCacheTTL is 0. + BlockPostingsForMatchersCacheSize int + + // BlockPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on compacted blocks + // regardless of the `concurrent` param. + BlockPostingsForMatchersCacheForce bool } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -569,7 +586,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { return nil, ErrClosed default: } - loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil) + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil, defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false) if err != nil { return nil, err } @@ -1353,7 +1370,7 @@ func (db *DB) reloadBlocks() (err error) { db.mtx.Lock() defer db.mtx.Unlock() - loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache) + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache, db.opts.BlockPostingsForMatchersCacheTTL, db.opts.BlockPostingsForMatchersCacheSize, db.opts.BlockPostingsForMatchersCacheForce) if err != nil { return err } @@ -1436,7 +1453,7 @@ func (db *DB) reloadBlocks() (err error) { return nil } -func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { +func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache, postingsCacheTTL time.Duration, postingsCacheSize int, postingsCacheForce bool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { bDirs, err := blockDirs(dir) if err != nil { return nil, nil, errors.Wrap(err, "find blocks") @@ -1458,7 +1475,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String()) } - block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider) + block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider, postingsCacheTTL, postingsCacheSize, postingsCacheForce) if err != nil { corrupted[meta.ULID] = err continue diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 0aa3fc200f..8420ef9577 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -271,7 +271,7 @@ func BenchmarkQuerierSelect(b *testing.B) { seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024) blockdir := createBlockFromHead(b, tmpdir, h) - block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test")) + block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"), defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false) require.NoError(b, err) defer func() { require.NoError(b, block.Close())