diff --git a/tsdb/block.go b/tsdb/block.go index cddb1b1b70..d2761ec84d 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -324,11 +324,11 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { - return OpenBlockWithOptions(logger, dir, pool, nil, defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false) + return OpenBlockWithOptions(logger, dir, pool, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce) } // OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function. -func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider, postingsCacheTTL time.Duration, postingsCacheSize int, postingsCacheForce bool) (pb *Block, err error) { +func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool) (pb *Block, err error) { if logger == nil { logger = log.NewNopLogger() } @@ -353,7 +353,7 @@ func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cac if err != nil { return nil, err } - pfmc := NewPostingsForMatchersCache(postingsCacheTTL, postingsCacheSize, postingsCacheForce) + pfmc := NewPostingsForMatchersCache(postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce) ir := indexReaderWithPostingsForMatchers{indexReader, pfmc} closers = append(closers, ir) diff --git a/tsdb/db.go b/tsdb/db.go index 34f6bbc89f..ee66a38eb5 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -72,27 +72,29 @@ var ErrNotReady = errors.New("TSDB not ready") // millisecond precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wlog.DefaultSegmentSize, - MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, - RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), - MinBlockDuration: DefaultBlockDuration, - MaxBlockDuration: DefaultBlockDuration, - NoLockfile: false, - AllowOverlappingCompaction: true, - SamplesPerChunk: DefaultSamplesPerChunk, - WALCompression: wlog.CompressionNone, - StripeSize: DefaultStripeSize, - HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, - IsolationDisabled: defaultIsolationDisabled, - HeadChunksEndTimeVariance: 0, - HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, - OutOfOrderCapMax: DefaultOutOfOrderCapMax, - HeadPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL, - HeadPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize, - HeadPostingsForMatchersCacheForce: false, - BlockPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL, - BlockPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize, - BlockPostingsForMatchersCacheForce: false, + WALSegmentSize: wlog.DefaultSegmentSize, + MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, + RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), + MinBlockDuration: DefaultBlockDuration, + MaxBlockDuration: DefaultBlockDuration, + NoLockfile: false, + AllowOverlappingCompaction: true, + SamplesPerChunk: DefaultSamplesPerChunk, + WALCompression: wlog.CompressionNone, + StripeSize: DefaultStripeSize, + HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, + IsolationDisabled: defaultIsolationDisabled, + HeadChunksEndTimeVariance: 0, + HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, + OutOfOrderCapMax: DefaultOutOfOrderCapMax, + HeadPostingsForMatchersCacheTTL: DefaultPostingsForMatchersCacheTTL, + HeadPostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems, + HeadPostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes, + HeadPostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce, + BlockPostingsForMatchersCacheTTL: DefaultPostingsForMatchersCacheTTL, + BlockPostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems, + BlockPostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes, + BlockPostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce, } } @@ -208,9 +210,13 @@ type Options struct { // If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished. HeadPostingsForMatchersCacheTTL time.Duration - // HeadPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in the Head. + // HeadPostingsForMatchersCacheMaxItems is the maximum size (in number of items) of cached postings for matchers elements in the Head. // It's ignored when HeadPostingsForMatchersCacheTTL is 0. - HeadPostingsForMatchersCacheSize int + HeadPostingsForMatchersCacheMaxItems int + + // HeadPostingsForMatchersCacheMaxBytes is the maximum size (in bytes) of cached postings for matchers elements in the Head. + // It's ignored when HeadPostingsForMatchersCacheTTL is 0. + HeadPostingsForMatchersCacheMaxBytes int64 // HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param. HeadPostingsForMatchersCacheForce bool @@ -219,9 +225,13 @@ type Options struct { // If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished. BlockPostingsForMatchersCacheTTL time.Duration - // BlockPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in each compacted block. + // BlockPostingsForMatchersCacheMaxItems is the maximum size (in number of items) of cached postings for matchers elements in each compacted block. // It's ignored when BlockPostingsForMatchersCacheTTL is 0. - BlockPostingsForMatchersCacheSize int + BlockPostingsForMatchersCacheMaxItems int + + // BlockPostingsForMatchersCacheMaxBytes is the maximum size (in bytes) of cached postings for matchers elements in each compacted block. + // It's ignored when BlockPostingsForMatchersCacheTTL is 0. + BlockPostingsForMatchersCacheMaxBytes int64 // BlockPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on compacted blocks // regardless of the `concurrent` param. @@ -592,7 +602,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { return nil, ErrClosed default: } - loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil, defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false) + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce) if err != nil { return nil, err } @@ -896,7 +906,8 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow) headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax) headOpts.PostingsForMatchersCacheTTL = opts.HeadPostingsForMatchersCacheTTL - headOpts.PostingsForMatchersCacheSize = opts.HeadPostingsForMatchersCacheSize + headOpts.PostingsForMatchersCacheMaxItems = opts.HeadPostingsForMatchersCacheMaxItems + headOpts.PostingsForMatchersCacheMaxBytes = opts.HeadPostingsForMatchersCacheMaxBytes headOpts.PostingsForMatchersCacheForce = opts.HeadPostingsForMatchersCacheForce if opts.WALReplayConcurrency > 0 { headOpts.WALReplayConcurrency = opts.WALReplayConcurrency @@ -1440,7 +1451,7 @@ func (db *DB) reloadBlocks() (err error) { db.mtx.Lock() defer db.mtx.Unlock() - loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache, db.opts.BlockPostingsForMatchersCacheTTL, db.opts.BlockPostingsForMatchersCacheSize, db.opts.BlockPostingsForMatchersCacheForce) + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache, db.opts.BlockPostingsForMatchersCacheTTL, db.opts.BlockPostingsForMatchersCacheMaxItems, db.opts.BlockPostingsForMatchersCacheMaxBytes, db.opts.BlockPostingsForMatchersCacheForce) if err != nil { return err } @@ -1523,7 +1534,7 @@ func (db *DB) reloadBlocks() (err error) { return nil } -func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache, postingsCacheTTL time.Duration, postingsCacheSize int, postingsCacheForce bool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { +func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { bDirs, err := blockDirs(dir) if err != nil { return nil, nil, errors.Wrap(err, "find blocks") @@ -1545,7 +1556,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String()) } - block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider, postingsCacheTTL, postingsCacheSize, postingsCacheForce) + block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider, postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce) if err != nil { corrupted[meta.ULID] = err continue diff --git a/tsdb/head.go b/tsdb/head.go index 66c44ec990..b09c9546c2 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -180,9 +180,10 @@ type HeadOptions struct { IsolationDisabled bool - PostingsForMatchersCacheTTL time.Duration - PostingsForMatchersCacheSize int - PostingsForMatchersCacheForce bool + PostingsForMatchersCacheTTL time.Duration + PostingsForMatchersCacheMaxItems int + PostingsForMatchersCacheMaxBytes int64 + PostingsForMatchersCacheForce bool // Maximum number of CPUs that can simultaneously processes WAL replay. // The default value is GOMAXPROCS. @@ -199,20 +200,21 @@ const ( func DefaultHeadOptions() *HeadOptions { ho := &HeadOptions{ - ChunkRange: DefaultBlockDuration, - ChunkDirRoot: "", - ChunkPool: chunkenc.NewPool(), - ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, - ChunkEndTimeVariance: 0, - ChunkWriteQueueSize: chunks.DefaultWriteQueueSize, - SamplesPerChunk: DefaultSamplesPerChunk, - StripeSize: DefaultStripeSize, - SeriesCallback: &noopSeriesLifecycleCallback{}, - IsolationDisabled: defaultIsolationDisabled, - PostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL, - PostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize, - PostingsForMatchersCacheForce: false, - WALReplayConcurrency: defaultWALReplayConcurrency, + ChunkRange: DefaultBlockDuration, + ChunkDirRoot: "", + ChunkPool: chunkenc.NewPool(), + ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, + ChunkEndTimeVariance: 0, + ChunkWriteQueueSize: chunks.DefaultWriteQueueSize, + SamplesPerChunk: DefaultSamplesPerChunk, + StripeSize: DefaultStripeSize, + SeriesCallback: &noopSeriesLifecycleCallback{}, + IsolationDisabled: defaultIsolationDisabled, + PostingsForMatchersCacheTTL: DefaultPostingsForMatchersCacheTTL, + PostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems, + PostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes, + PostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce, + WALReplayConcurrency: defaultWALReplayConcurrency, } ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax) return ho @@ -279,7 +281,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea stats: stats, reg: r, - pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheSize, opts.PostingsForMatchersCacheForce), + pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheMaxItems, opts.PostingsForMatchersCacheMaxBytes, opts.PostingsForMatchersCacheForce), } if err := h.resetInMemoryState(); err != nil { return nil, err diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index c0a80f733f..eb572704c3 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -853,6 +853,16 @@ type PostingsCloner struct { // and it shouldn't be used once provided to the PostingsCloner. func NewPostingsCloner(p Postings) *PostingsCloner { ids, err := ExpandPostings(p) + + // The ExpandedPostings() doesn't know the total number of postings beforehand, + // so the returned slice capacity may be well above the actual number of items. + // In such case, we shrink it. + if float64(len(ids)) < float64(cap(ids))*0.70 { + shrinked := make([]storage.SeriesRef, len(ids)) + copy(shrinked, ids) + ids = shrinked + } + return &PostingsCloner{ids: ids, err: err} } diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index cf479498ef..61a1ca3de1 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" @@ -1103,6 +1104,20 @@ func TestPostingsCloner(t *testing.T) { }) } +func TestNewPostingsCloner_ShrinkExpandedPostingsSlice(t *testing.T) { + t.Run("should not shrink expanded postings if length is >= 70% capacity", func(t *testing.T) { + cloner := NewPostingsCloner(NewListPostings(make([]storage.SeriesRef, 60))) + assert.Equal(t, 60, len(cloner.ids)) + assert.Equal(t, 64, cap(cloner.ids)) // Not shrinked. + }) + + t.Run("should shrink expanded postings if length is < 70% capacity", func(t *testing.T) { + cloner := NewPostingsCloner(NewListPostings(make([]storage.SeriesRef, 33))) + assert.Equal(t, 33, len(cloner.ids)) + assert.Equal(t, 33, cap(cloner.ids)) // Shrinked. + }) +} + func TestFindIntersectingPostings(t *testing.T) { t.Run("multiple intersections", func(t *testing.T) { p := NewListPostings([]storage.SeriesRef{10, 15, 20, 25, 30, 35, 40, 45, 50}) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 289edde5b8..6ac0fc20fb 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -7,13 +7,19 @@ import ( "sync" "time" + "github.com/DmitriyVTitov/size" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/index" ) const ( - defaultPostingsForMatchersCacheTTL = 10 * time.Second - defaultPostingsForMatchersCacheSize = 100 + // NOTE: keep them exported to reference them in Mimir. + + DefaultPostingsForMatchersCacheTTL = 10 * time.Second + DefaultPostingsForMatchersCacheMaxItems = 100 + DefaultPostingsForMatchersCacheMaxBytes = 10 * 1024 * 1024 // Based on the default max items, 10MB / 100 = 100KB per cached entry on average. + DefaultPostingsForMatchersCacheForce = false ) // IndexPostingsReader is a subset of IndexReader methods, the minimum required to evaluate PostingsForMatchers @@ -31,14 +37,15 @@ type IndexPostingsReader interface { // NewPostingsForMatchersCache creates a new PostingsForMatchersCache. // If `ttl` is 0, then it only deduplicates in-flight requests. // If `force` is true, then all requests go through cache, regardless of the `concurrent` param provided. -func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int, force bool) *PostingsForMatchersCache { +func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64, force bool) *PostingsForMatchersCache { b := &PostingsForMatchersCache{ calls: &sync.Map{}, cached: list.New(), - ttl: ttl, - cacheSize: cacheSize, - force: force, + ttl: ttl, + maxItems: maxItems, + maxBytes: maxBytes, + force: force, timeNow: time.Now, postingsForMatchers: PostingsForMatchers, @@ -51,12 +58,14 @@ func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int, force bool) * type PostingsForMatchersCache struct { calls *sync.Map - cachedMtx sync.RWMutex - cached *list.List + cachedMtx sync.RWMutex + cached *list.List + cachedBytes int64 - ttl time.Duration - cacheSize int - force bool + ttl time.Duration + maxItems int + maxBytes int64 + force bool // timeNow is the time.Now that can be replaced for testing purposes timeNow func() time.Time @@ -101,13 +110,20 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex cloner = index.NewPostingsCloner(postings) } - c.created(key, c.timeNow()) + // Estimate the size of the cache entry, in bytes. We use max() because + // size.Of() returns -1 if the value is nil. + estimatedSizeBytes := int64(len(key)) + max(0, int64(size.Of(wg))) + max(0, int64(size.Of(outerErr))) + max(0, int64(size.Of(cloner))) + + c.created(key, c.timeNow(), estimatedSizeBytes) return promise } type postingsForMatchersCachedCall struct { key string ts time.Time + + // Size of he cached entry, in bytes. + sizeBytes int64 } func (c *PostingsForMatchersCache) expire() { @@ -134,9 +150,11 @@ func (c *PostingsForMatchersCache) expire() { // or because the cache has too many elements // should be called while read lock is held on cachedMtx func (c *PostingsForMatchersCache) shouldEvictHead() bool { - if c.cached.Len() > c.cacheSize { + // The cache should be evicted for sure if the max size (either items or bytes) is reached. + if c.cached.Len() > c.maxItems || c.cachedBytes > c.maxBytes { return true } + h := c.cached.Front() if h == nil { return false @@ -150,11 +168,12 @@ func (c *PostingsForMatchersCache) evictHead() { oldest := front.Value.(*postingsForMatchersCachedCall) c.calls.Delete(oldest.key) c.cached.Remove(front) + c.cachedBytes -= oldest.sizeBytes } // created has to be called when returning from the PostingsForMatchers call that creates the promise. // the ts provided should be the call time. -func (c *PostingsForMatchersCache) created(key string, ts time.Time) { +func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes int64) { if c.ttl <= 0 { c.calls.Delete(key) return @@ -164,9 +183,11 @@ func (c *PostingsForMatchersCache) created(key string, ts time.Time) { defer c.cachedMtx.Unlock() c.cached.PushBack(&postingsForMatchersCachedCall{ - key: key, - ts: ts, + key: key, + ts: ts, + sizeBytes: sizeBytes, }) + c.cachedBytes += sizeBytes } // matchersKey provides a unique string key for the given matchers slice diff --git a/tsdb/postings_for_matchers_cache_test.go b/tsdb/postings_for_matchers_cache_test.go index f03b0232c8..1c05ef135f 100644 --- a/tsdb/postings_for_matchers_cache_test.go +++ b/tsdb/postings_for_matchers_cache_test.go @@ -8,17 +8,18 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/index" ) func TestPostingsForMatchersCache(t *testing.T) { - const testCacheSize = 5 // newPostingsForMatchersCache tests the NewPostingsForMatcherCache constructor, but overrides the postingsForMatchers func - newPostingsForMatchersCache := func(ttl time.Duration, pfm func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock, force bool) *PostingsForMatchersCache { - c := NewPostingsForMatchersCache(ttl, testCacheSize, force) + newPostingsForMatchersCache := func(ttl time.Duration, maxItems int, maxBytes int64, pfm func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock, force bool) *PostingsForMatchersCache { + c := NewPostingsForMatchersCache(ttl, maxItems, maxBytes, force) if c.postingsForMatchers == nil { t.Fatalf("NewPostingsForMatchersCache() didn't assign postingsForMatchers func") } @@ -35,7 +36,7 @@ func TestPostingsForMatchersCache(t *testing.T) { expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")} expectedPostingsErr := fmt.Errorf("failed successfully") - c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, 5, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { require.IsType(t, indexForPostingsMock{}, ix, "Incorrect IndexPostingsReader was provided to PostingsForMatchers, expected the mock, was given %v (%T)", ix, ix) require.Equal(t, expectedMatchers, ms, "Wrong label matchers provided, expected %v, got %v", expectedMatchers, ms) return index.ErrPostings(expectedPostingsErr), nil @@ -53,7 +54,7 @@ func TestPostingsForMatchersCache(t *testing.T) { expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")} expectedErr := fmt.Errorf("failed successfully") - c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, 5, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { return nil, expectedErr }, &timeNowMock{}, false) @@ -99,9 +100,9 @@ func TestPostingsForMatchersCache(t *testing.T) { release := make(chan struct{}) var ttl time.Duration if cacheEnabled { - ttl = defaultPostingsForMatchersCacheTTL + ttl = DefaultPostingsForMatchersCacheTTL } - c := newPostingsForMatchersCache(ttl, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + c := newPostingsForMatchersCache(ttl, 5, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { select { case called <- struct{}{}: default: @@ -148,7 +149,7 @@ func TestPostingsForMatchersCache(t *testing.T) { expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")} var call int - c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, 5, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { call++ return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil }, &timeNowMock{}, false) @@ -168,7 +169,7 @@ func TestPostingsForMatchersCache(t *testing.T) { expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")} var call int - c := newPostingsForMatchersCache(0, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + c := newPostingsForMatchersCache(0, 1000, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { call++ return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil }, &timeNowMock{}, false) @@ -191,7 +192,7 @@ func TestPostingsForMatchersCache(t *testing.T) { } var call int - c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, 5, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { call++ return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil }, timeNow, false) @@ -201,14 +202,14 @@ func TestPostingsForMatchersCache(t *testing.T) { require.NoError(t, err) require.EqualError(t, p.Err(), "result from call 1") - timeNow.advance(defaultPostingsForMatchersCacheTTL / 2) + timeNow.advance(DefaultPostingsForMatchersCacheTTL / 2) // second call within the ttl, should use the cache p, err = c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, expectedMatchers...) require.NoError(t, err) require.EqualError(t, p.Err(), "result from call 1") - timeNow.advance(defaultPostingsForMatchersCacheTTL / 2) + timeNow.advance(DefaultPostingsForMatchersCacheTTL / 2) // third call is after ttl (exactly), should call again p, err = c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, expectedMatchers...) @@ -216,15 +217,17 @@ func TestPostingsForMatchersCache(t *testing.T) { require.EqualError(t, p.Err(), "result from call 2") }) - t.Run("cached value is evicted because cache exceeds max size", func(t *testing.T) { + t.Run("cached value is evicted because cache exceeds max items", func(t *testing.T) { + const maxItems = 5 + timeNow := &timeNowMock{} - calls := make([][]*labels.Matcher, testCacheSize) + calls := make([][]*labels.Matcher, maxItems) for i := range calls { calls[i] = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "matchers", fmt.Sprintf("%d", i))} } callsPerMatchers := map[string]int{} - c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, maxItems, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { k := matchersKey(ms) callsPerMatchers[k]++ return index.ErrPostings(fmt.Errorf("result from call %d", callsPerMatchers[k])), nil @@ -260,6 +263,80 @@ func TestPostingsForMatchersCache(t *testing.T) { require.NoError(t, err) require.EqualError(t, p.Err(), "result from call 2") }) + + t.Run("cached value is evicted because cache exceeds max bytes", func(t *testing.T) { + const ( + maxItems = 100 // Never hit it. + maxBytes = 1000 + numMatchers = 5 + postingsListSize = 30 // 8 bytes per posting ref, so 30 x 8 = 240 bytes. + ) + + // Generate some matchers. + matchersLists := make([][]*labels.Matcher, numMatchers) + for i := range matchersLists { + matchersLists[i] = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "matchers", fmt.Sprintf("%d", i))} + } + + // Generate some postings lists. + refsLists := make(map[string][]storage.SeriesRef, numMatchers) + for i := 0; i < numMatchers; i++ { + refs := make([]storage.SeriesRef, postingsListSize) + for r := range refs { + refs[r] = storage.SeriesRef(i * r) + } + + refsLists[matchersKey(matchersLists[i])] = refs + } + + callsPerMatchers := map[string]int{} + c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, maxItems, maxBytes, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + k := matchersKey(ms) + callsPerMatchers[k]++ + return index.NewListPostings(refsLists[k]), nil + }, &timeNowMock{}, false) + + // We expect to cache 3 items. So we're going to call PostingsForMatchers for 3 matchers + // and then double check they're all cached. To do it, we iterate twice. + for run := 0; run < 2; run++ { + for i := 0; i < 3; i++ { + matchers := matchersLists[i] + + p, err := c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, matchers...) + require.NoError(t, err) + + actual, err := index.ExpandPostings(p) + require.NoError(t, err) + assert.Equal(t, refsLists[matchersKey(matchers)], actual) + } + } + + // At this point we expect that the postings have been computed only once for the 3 matchers. + for i := 0; i < 3; i++ { + assert.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[i])]) + } + + // Call PostingsForMatchers() for a 4th matcher. We expect this will evict the oldest cached entry. + for run := 0; run < 2; run++ { + matchers := matchersLists[3] + + p, err := c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, matchers...) + require.NoError(t, err) + + actual, err := index.ExpandPostings(p) + require.NoError(t, err) + assert.Equal(t, refsLists[matchersKey(matchers)], actual) + } + + // To ensure the 1st (oldest) entry was removed, we call PostingsForMatchers() again on that matchers. + _, err := c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, matchersLists[0]...) + require.NoError(t, err) + + assert.Equal(t, 2, callsPerMatchers[matchersKey(matchersLists[0])]) + assert.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[1])]) + assert.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[2])]) + assert.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[3])]) + }) } type indexForPostingsMock struct{} diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index fee9ad9dcd..3765b60426 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -313,7 +313,7 @@ func BenchmarkQuerierSelect(b *testing.B) { seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024) blockdir := createBlockFromHead(b, tmpdir, h) - block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"), defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false) + block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"), DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce) require.NoError(b, err) defer func() { require.NoError(b, block.Close())