Allow to configure compacted blocks postings for matchers cache

Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
Marco Pracucci 2023-03-22 06:40:11 +01:00
parent 1b8dfd8b83
commit 330e9b69af
No known key found for this signature in database
GPG key ID: 74C1BD403D2DF9B5
3 changed files with 43 additions and 25 deletions

View file

@ -20,6 +20,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
@ -322,11 +323,11 @@ type Block struct {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs. // to instantiate chunk structs.
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
return OpenBlockWithOptions(logger, dir, pool, nil) return OpenBlockWithOptions(logger, dir, pool, nil, defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false)
} }
// OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function. // OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function.
func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider) (pb *Block, err error) { func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider, postingsCacheTTL time.Duration, postingsCacheSize int, postingsCacheForce bool) (pb *Block, err error) {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
@ -351,7 +352,7 @@ func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cac
if err != nil { if err != nil {
return nil, err return nil, err
} }
pfmc := NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false) pfmc := NewPostingsForMatchersCache(postingsCacheTTL, postingsCacheSize, postingsCacheForce)
ir := indexReaderWithPostingsForMatchers{indexReader, pfmc} ir := indexReaderWithPostingsForMatchers{indexReader, pfmc}
closers = append(closers, ir) closers = append(closers, ir)

View file

@ -72,23 +72,26 @@ var ErrNotReady = errors.New("TSDB not ready")
// millisecond precision timestamps. // millisecond precision timestamps.
func DefaultOptions() *Options { func DefaultOptions() *Options {
return &Options{ return &Options{
WALSegmentSize: wlog.DefaultSegmentSize, WALSegmentSize: wlog.DefaultSegmentSize,
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
MinBlockDuration: DefaultBlockDuration, MinBlockDuration: DefaultBlockDuration,
MaxBlockDuration: DefaultBlockDuration, MaxBlockDuration: DefaultBlockDuration,
NoLockfile: false, NoLockfile: false,
AllowOverlappingCompaction: true, AllowOverlappingCompaction: true,
WALCompression: false, WALCompression: false,
StripeSize: DefaultStripeSize, StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled, IsolationDisabled: defaultIsolationDisabled,
HeadChunksEndTimeVariance: 0, HeadChunksEndTimeVariance: 0,
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
OutOfOrderCapMax: DefaultOutOfOrderCapMax, OutOfOrderCapMax: DefaultOutOfOrderCapMax,
HeadPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL, HeadPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL,
HeadPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize, HeadPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize,
HeadPostingsForMatchersCacheForce: false, HeadPostingsForMatchersCacheForce: false,
BlockPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL,
BlockPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize,
BlockPostingsForMatchersCacheForce: false,
} }
} }
@ -200,11 +203,25 @@ type Options struct {
// HeadPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache in the Head. // HeadPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache in the Head.
// If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished. // If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished.
HeadPostingsForMatchersCacheTTL time.Duration HeadPostingsForMatchersCacheTTL time.Duration
// HeadPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in the Head. // HeadPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in the Head.
// It's ignored when HeadPostingsForMatchersCacheTTL is 0. // It's ignored when HeadPostingsForMatchersCacheTTL is 0.
HeadPostingsForMatchersCacheSize int HeadPostingsForMatchersCacheSize int
// HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param. // HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param.
HeadPostingsForMatchersCacheForce bool HeadPostingsForMatchersCacheForce bool
// BlockPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache of each compacted block.
// If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished.
BlockPostingsForMatchersCacheTTL time.Duration
// BlockPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in each compacted block.
// It's ignored when BlockPostingsForMatchersCacheTTL is 0.
BlockPostingsForMatchersCacheSize int
// BlockPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on compacted blocks
// regardless of the `concurrent` param.
BlockPostingsForMatchersCacheForce bool
} }
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
@ -569,7 +586,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
return nil, ErrClosed return nil, ErrClosed
default: default:
} }
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil) loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil, defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1353,7 +1370,7 @@ func (db *DB) reloadBlocks() (err error) {
db.mtx.Lock() db.mtx.Lock()
defer db.mtx.Unlock() defer db.mtx.Unlock()
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache) loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache, db.opts.BlockPostingsForMatchersCacheTTL, db.opts.BlockPostingsForMatchersCacheSize, db.opts.BlockPostingsForMatchersCacheForce)
if err != nil { if err != nil {
return err return err
} }
@ -1436,7 +1453,7 @@ func (db *DB) reloadBlocks() (err error) {
return nil return nil
} }
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache, postingsCacheTTL time.Duration, postingsCacheSize int, postingsCacheForce bool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir) bDirs, err := blockDirs(dir)
if err != nil { if err != nil {
return nil, nil, errors.Wrap(err, "find blocks") return nil, nil, errors.Wrap(err, "find blocks")
@ -1458,7 +1475,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String()) cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String())
} }
block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider) block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider, postingsCacheTTL, postingsCacheSize, postingsCacheForce)
if err != nil { if err != nil {
corrupted[meta.ULID] = err corrupted[meta.ULID] = err
continue continue

View file

@ -271,7 +271,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024) seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024)
blockdir := createBlockFromHead(b, tmpdir, h) blockdir := createBlockFromHead(b, tmpdir, h)
block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test")) block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"), defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false)
require.NoError(b, err) require.NoError(b, err)
defer func() { defer func() {
require.NoError(b, block.Close()) require.NoError(b, block.Close())