Add PostingsForMatchers cache size by bytes support

Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
Marco Pracucci 2023-09-27 15:25:39 +02:00
parent 320f0c9c4a
commit 3c68ce252e
No known key found for this signature in database
GPG key ID: 74C1BD403D2DF9B5
8 changed files with 219 additions and 83 deletions

View file

@ -324,11 +324,11 @@ type Block struct {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
return OpenBlockWithOptions(logger, dir, pool, nil, defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false)
return OpenBlockWithOptions(logger, dir, pool, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce)
}
// OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function.
func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider, postingsCacheTTL time.Duration, postingsCacheSize int, postingsCacheForce bool) (pb *Block, err error) {
func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cache index.ReaderCacheProvider, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool) (pb *Block, err error) {
if logger == nil {
logger = log.NewNopLogger()
}
@ -353,7 +353,7 @@ func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, cac
if err != nil {
return nil, err
}
pfmc := NewPostingsForMatchersCache(postingsCacheTTL, postingsCacheSize, postingsCacheForce)
pfmc := NewPostingsForMatchersCache(postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce)
ir := indexReaderWithPostingsForMatchers{indexReader, pfmc}
closers = append(closers, ir)

View file

@ -72,27 +72,29 @@ var ErrNotReady = errors.New("TSDB not ready")
// millisecond precision timestamps.
func DefaultOptions() *Options {
return &Options{
WALSegmentSize: wlog.DefaultSegmentSize,
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
MinBlockDuration: DefaultBlockDuration,
MaxBlockDuration: DefaultBlockDuration,
NoLockfile: false,
AllowOverlappingCompaction: true,
SamplesPerChunk: DefaultSamplesPerChunk,
WALCompression: wlog.CompressionNone,
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
HeadChunksEndTimeVariance: 0,
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
HeadPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL,
HeadPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize,
HeadPostingsForMatchersCacheForce: false,
BlockPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL,
BlockPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize,
BlockPostingsForMatchersCacheForce: false,
WALSegmentSize: wlog.DefaultSegmentSize,
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
MinBlockDuration: DefaultBlockDuration,
MaxBlockDuration: DefaultBlockDuration,
NoLockfile: false,
AllowOverlappingCompaction: true,
SamplesPerChunk: DefaultSamplesPerChunk,
WALCompression: wlog.CompressionNone,
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
HeadChunksEndTimeVariance: 0,
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
HeadPostingsForMatchersCacheTTL: DefaultPostingsForMatchersCacheTTL,
HeadPostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems,
HeadPostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes,
HeadPostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce,
BlockPostingsForMatchersCacheTTL: DefaultPostingsForMatchersCacheTTL,
BlockPostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems,
BlockPostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes,
BlockPostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce,
}
}
@ -208,9 +210,13 @@ type Options struct {
// If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished.
HeadPostingsForMatchersCacheTTL time.Duration
// HeadPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in the Head.
// HeadPostingsForMatchersCacheMaxItems is the maximum size (in number of items) of cached postings for matchers elements in the Head.
// It's ignored when HeadPostingsForMatchersCacheTTL is 0.
HeadPostingsForMatchersCacheSize int
HeadPostingsForMatchersCacheMaxItems int
// HeadPostingsForMatchersCacheMaxBytes is the maximum size (in bytes) of cached postings for matchers elements in the Head.
// It's ignored when HeadPostingsForMatchersCacheTTL is 0.
HeadPostingsForMatchersCacheMaxBytes int64
// HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param.
HeadPostingsForMatchersCacheForce bool
@ -219,9 +225,13 @@ type Options struct {
// If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished.
BlockPostingsForMatchersCacheTTL time.Duration
// BlockPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in each compacted block.
// BlockPostingsForMatchersCacheMaxItems is the maximum size (in number of items) of cached postings for matchers elements in each compacted block.
// It's ignored when BlockPostingsForMatchersCacheTTL is 0.
BlockPostingsForMatchersCacheSize int
BlockPostingsForMatchersCacheMaxItems int
// BlockPostingsForMatchersCacheMaxBytes is the maximum size (in bytes) of cached postings for matchers elements in each compacted block.
// It's ignored when BlockPostingsForMatchersCacheTTL is 0.
BlockPostingsForMatchersCacheMaxBytes int64
// BlockPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on compacted blocks
// regardless of the `concurrent` param.
@ -592,7 +602,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
return nil, ErrClosed
default:
}
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil, defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false)
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce)
if err != nil {
return nil, err
}
@ -896,7 +906,8 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
headOpts.PostingsForMatchersCacheTTL = opts.HeadPostingsForMatchersCacheTTL
headOpts.PostingsForMatchersCacheSize = opts.HeadPostingsForMatchersCacheSize
headOpts.PostingsForMatchersCacheMaxItems = opts.HeadPostingsForMatchersCacheMaxItems
headOpts.PostingsForMatchersCacheMaxBytes = opts.HeadPostingsForMatchersCacheMaxBytes
headOpts.PostingsForMatchersCacheForce = opts.HeadPostingsForMatchersCacheForce
if opts.WALReplayConcurrency > 0 {
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
@ -1440,7 +1451,7 @@ func (db *DB) reloadBlocks() (err error) {
db.mtx.Lock()
defer db.mtx.Unlock()
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache, db.opts.BlockPostingsForMatchersCacheTTL, db.opts.BlockPostingsForMatchersCacheSize, db.opts.BlockPostingsForMatchersCacheForce)
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.SeriesHashCache, db.opts.BlockPostingsForMatchersCacheTTL, db.opts.BlockPostingsForMatchersCacheMaxItems, db.opts.BlockPostingsForMatchersCacheMaxBytes, db.opts.BlockPostingsForMatchersCacheForce)
if err != nil {
return err
}
@ -1523,7 +1534,7 @@ func (db *DB) reloadBlocks() (err error) {
return nil
}
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache, postingsCacheTTL time.Duration, postingsCacheSize int, postingsCacheForce bool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, cache *hashcache.SeriesHashCache, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir)
if err != nil {
return nil, nil, errors.Wrap(err, "find blocks")
@ -1545,7 +1556,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String())
}
block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider, postingsCacheTTL, postingsCacheSize, postingsCacheForce)
block, err = OpenBlockWithOptions(l, bDir, chunkPool, cacheProvider, postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce)
if err != nil {
corrupted[meta.ULID] = err
continue

View file

@ -180,9 +180,10 @@ type HeadOptions struct {
IsolationDisabled bool
PostingsForMatchersCacheTTL time.Duration
PostingsForMatchersCacheSize int
PostingsForMatchersCacheForce bool
PostingsForMatchersCacheTTL time.Duration
PostingsForMatchersCacheMaxItems int
PostingsForMatchersCacheMaxBytes int64
PostingsForMatchersCacheForce bool
// Maximum number of CPUs that can simultaneously processes WAL replay.
// The default value is GOMAXPROCS.
@ -199,20 +200,21 @@ const (
func DefaultHeadOptions() *HeadOptions {
ho := &HeadOptions{
ChunkRange: DefaultBlockDuration,
ChunkDirRoot: "",
ChunkPool: chunkenc.NewPool(),
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
ChunkEndTimeVariance: 0,
ChunkWriteQueueSize: chunks.DefaultWriteQueueSize,
SamplesPerChunk: DefaultSamplesPerChunk,
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled,
PostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL,
PostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize,
PostingsForMatchersCacheForce: false,
WALReplayConcurrency: defaultWALReplayConcurrency,
ChunkRange: DefaultBlockDuration,
ChunkDirRoot: "",
ChunkPool: chunkenc.NewPool(),
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
ChunkEndTimeVariance: 0,
ChunkWriteQueueSize: chunks.DefaultWriteQueueSize,
SamplesPerChunk: DefaultSamplesPerChunk,
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled,
PostingsForMatchersCacheTTL: DefaultPostingsForMatchersCacheTTL,
PostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems,
PostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes,
PostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce,
WALReplayConcurrency: defaultWALReplayConcurrency,
}
ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax)
return ho
@ -279,7 +281,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea
stats: stats,
reg: r,
pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheSize, opts.PostingsForMatchersCacheForce),
pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheMaxItems, opts.PostingsForMatchersCacheMaxBytes, opts.PostingsForMatchersCacheForce),
}
if err := h.resetInMemoryState(); err != nil {
return nil, err

View file

@ -853,6 +853,16 @@ type PostingsCloner struct {
// and it shouldn't be used once provided to the PostingsCloner.
func NewPostingsCloner(p Postings) *PostingsCloner {
ids, err := ExpandPostings(p)
// The ExpandedPostings() doesn't know the total number of postings beforehand,
// so the returned slice capacity may be well above the actual number of items.
// In such case, we shrink it.
if float64(len(ids)) < float64(cap(ids))*0.70 {
shrinked := make([]storage.SeriesRef, len(ids))
copy(shrinked, ids)
ids = shrinked
}
return &PostingsCloner{ids: ids, err: err}
}

View file

@ -24,6 +24,7 @@ import (
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
@ -1103,6 +1104,20 @@ func TestPostingsCloner(t *testing.T) {
})
}
func TestNewPostingsCloner_ShrinkExpandedPostingsSlice(t *testing.T) {
t.Run("should not shrink expanded postings if length is >= 70% capacity", func(t *testing.T) {
cloner := NewPostingsCloner(NewListPostings(make([]storage.SeriesRef, 60)))
assert.Equal(t, 60, len(cloner.ids))
assert.Equal(t, 64, cap(cloner.ids)) // Not shrinked.
})
t.Run("should shrink expanded postings if length is < 70% capacity", func(t *testing.T) {
cloner := NewPostingsCloner(NewListPostings(make([]storage.SeriesRef, 33)))
assert.Equal(t, 33, len(cloner.ids))
assert.Equal(t, 33, cap(cloner.ids)) // Shrinked.
})
}
func TestFindIntersectingPostings(t *testing.T) {
t.Run("multiple intersections", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 15, 20, 25, 30, 35, 40, 45, 50})

View file

@ -7,13 +7,19 @@ import (
"sync"
"time"
"github.com/DmitriyVTitov/size"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/index"
)
const (
defaultPostingsForMatchersCacheTTL = 10 * time.Second
defaultPostingsForMatchersCacheSize = 100
// NOTE: keep them exported to reference them in Mimir.
DefaultPostingsForMatchersCacheTTL = 10 * time.Second
DefaultPostingsForMatchersCacheMaxItems = 100
DefaultPostingsForMatchersCacheMaxBytes = 10 * 1024 * 1024 // Based on the default max items, 10MB / 100 = 100KB per cached entry on average.
DefaultPostingsForMatchersCacheForce = false
)
// IndexPostingsReader is a subset of IndexReader methods, the minimum required to evaluate PostingsForMatchers
@ -31,14 +37,15 @@ type IndexPostingsReader interface {
// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
// If `ttl` is 0, then it only deduplicates in-flight requests.
// If `force` is true, then all requests go through cache, regardless of the `concurrent` param provided.
func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int, force bool) *PostingsForMatchersCache {
func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64, force bool) *PostingsForMatchersCache {
b := &PostingsForMatchersCache{
calls: &sync.Map{},
cached: list.New(),
ttl: ttl,
cacheSize: cacheSize,
force: force,
ttl: ttl,
maxItems: maxItems,
maxBytes: maxBytes,
force: force,
timeNow: time.Now,
postingsForMatchers: PostingsForMatchers,
@ -51,12 +58,14 @@ func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int, force bool) *
type PostingsForMatchersCache struct {
calls *sync.Map
cachedMtx sync.RWMutex
cached *list.List
cachedMtx sync.RWMutex
cached *list.List
cachedBytes int64
ttl time.Duration
cacheSize int
force bool
ttl time.Duration
maxItems int
maxBytes int64
force bool
// timeNow is the time.Now that can be replaced for testing purposes
timeNow func() time.Time
@ -101,13 +110,20 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex
cloner = index.NewPostingsCloner(postings)
}
c.created(key, c.timeNow())
// Estimate the size of the cache entry, in bytes. We use max() because
// size.Of() returns -1 if the value is nil.
estimatedSizeBytes := int64(len(key)) + max(0, int64(size.Of(wg))) + max(0, int64(size.Of(outerErr))) + max(0, int64(size.Of(cloner)))
c.created(key, c.timeNow(), estimatedSizeBytes)
return promise
}
type postingsForMatchersCachedCall struct {
key string
ts time.Time
// Size of he cached entry, in bytes.
sizeBytes int64
}
func (c *PostingsForMatchersCache) expire() {
@ -134,9 +150,11 @@ func (c *PostingsForMatchersCache) expire() {
// or because the cache has too many elements
// should be called while read lock is held on cachedMtx
func (c *PostingsForMatchersCache) shouldEvictHead() bool {
if c.cached.Len() > c.cacheSize {
// The cache should be evicted for sure if the max size (either items or bytes) is reached.
if c.cached.Len() > c.maxItems || c.cachedBytes > c.maxBytes {
return true
}
h := c.cached.Front()
if h == nil {
return false
@ -150,11 +168,12 @@ func (c *PostingsForMatchersCache) evictHead() {
oldest := front.Value.(*postingsForMatchersCachedCall)
c.calls.Delete(oldest.key)
c.cached.Remove(front)
c.cachedBytes -= oldest.sizeBytes
}
// created has to be called when returning from the PostingsForMatchers call that creates the promise.
// the ts provided should be the call time.
func (c *PostingsForMatchersCache) created(key string, ts time.Time) {
func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes int64) {
if c.ttl <= 0 {
c.calls.Delete(key)
return
@ -164,9 +183,11 @@ func (c *PostingsForMatchersCache) created(key string, ts time.Time) {
defer c.cachedMtx.Unlock()
c.cached.PushBack(&postingsForMatchersCachedCall{
key: key,
ts: ts,
key: key,
ts: ts,
sizeBytes: sizeBytes,
})
c.cachedBytes += sizeBytes
}
// matchersKey provides a unique string key for the given matchers slice

View file

@ -8,17 +8,18 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
)
func TestPostingsForMatchersCache(t *testing.T) {
const testCacheSize = 5
// newPostingsForMatchersCache tests the NewPostingsForMatcherCache constructor, but overrides the postingsForMatchers func
newPostingsForMatchersCache := func(ttl time.Duration, pfm func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock, force bool) *PostingsForMatchersCache {
c := NewPostingsForMatchersCache(ttl, testCacheSize, force)
newPostingsForMatchersCache := func(ttl time.Duration, maxItems int, maxBytes int64, pfm func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock, force bool) *PostingsForMatchersCache {
c := NewPostingsForMatchersCache(ttl, maxItems, maxBytes, force)
if c.postingsForMatchers == nil {
t.Fatalf("NewPostingsForMatchersCache() didn't assign postingsForMatchers func")
}
@ -35,7 +36,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
expectedPostingsErr := fmt.Errorf("failed successfully")
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, 5, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
require.IsType(t, indexForPostingsMock{}, ix, "Incorrect IndexPostingsReader was provided to PostingsForMatchers, expected the mock, was given %v (%T)", ix, ix)
require.Equal(t, expectedMatchers, ms, "Wrong label matchers provided, expected %v, got %v", expectedMatchers, ms)
return index.ErrPostings(expectedPostingsErr), nil
@ -53,7 +54,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
expectedErr := fmt.Errorf("failed successfully")
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, 5, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
return nil, expectedErr
}, &timeNowMock{}, false)
@ -99,9 +100,9 @@ func TestPostingsForMatchersCache(t *testing.T) {
release := make(chan struct{})
var ttl time.Duration
if cacheEnabled {
ttl = defaultPostingsForMatchersCacheTTL
ttl = DefaultPostingsForMatchersCacheTTL
}
c := newPostingsForMatchersCache(ttl, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
c := newPostingsForMatchersCache(ttl, 5, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
select {
case called <- struct{}{}:
default:
@ -148,7 +149,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
var call int
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, 5, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, &timeNowMock{}, false)
@ -168,7 +169,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
var call int
c := newPostingsForMatchersCache(0, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
c := newPostingsForMatchersCache(0, 1000, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, &timeNowMock{}, false)
@ -191,7 +192,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
}
var call int
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, 5, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, timeNow, false)
@ -201,14 +202,14 @@ func TestPostingsForMatchersCache(t *testing.T) {
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 1")
timeNow.advance(defaultPostingsForMatchersCacheTTL / 2)
timeNow.advance(DefaultPostingsForMatchersCacheTTL / 2)
// second call within the ttl, should use the cache
p, err = c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, expectedMatchers...)
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 1")
timeNow.advance(defaultPostingsForMatchersCacheTTL / 2)
timeNow.advance(DefaultPostingsForMatchersCacheTTL / 2)
// third call is after ttl (exactly), should call again
p, err = c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, expectedMatchers...)
@ -216,15 +217,17 @@ func TestPostingsForMatchersCache(t *testing.T) {
require.EqualError(t, p.Err(), "result from call 2")
})
t.Run("cached value is evicted because cache exceeds max size", func(t *testing.T) {
t.Run("cached value is evicted because cache exceeds max items", func(t *testing.T) {
const maxItems = 5
timeNow := &timeNowMock{}
calls := make([][]*labels.Matcher, testCacheSize)
calls := make([][]*labels.Matcher, maxItems)
for i := range calls {
calls[i] = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "matchers", fmt.Sprintf("%d", i))}
}
callsPerMatchers := map[string]int{}
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, maxItems, 1000, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
k := matchersKey(ms)
callsPerMatchers[k]++
return index.ErrPostings(fmt.Errorf("result from call %d", callsPerMatchers[k])), nil
@ -260,6 +263,80 @@ func TestPostingsForMatchersCache(t *testing.T) {
require.NoError(t, err)
require.EqualError(t, p.Err(), "result from call 2")
})
t.Run("cached value is evicted because cache exceeds max bytes", func(t *testing.T) {
const (
maxItems = 100 // Never hit it.
maxBytes = 1000
numMatchers = 5
postingsListSize = 30 // 8 bytes per posting ref, so 30 x 8 = 240 bytes.
)
// Generate some matchers.
matchersLists := make([][]*labels.Matcher, numMatchers)
for i := range matchersLists {
matchersLists[i] = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "matchers", fmt.Sprintf("%d", i))}
}
// Generate some postings lists.
refsLists := make(map[string][]storage.SeriesRef, numMatchers)
for i := 0; i < numMatchers; i++ {
refs := make([]storage.SeriesRef, postingsListSize)
for r := range refs {
refs[r] = storage.SeriesRef(i * r)
}
refsLists[matchersKey(matchersLists[i])] = refs
}
callsPerMatchers := map[string]int{}
c := newPostingsForMatchersCache(DefaultPostingsForMatchersCacheTTL, maxItems, maxBytes, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
k := matchersKey(ms)
callsPerMatchers[k]++
return index.NewListPostings(refsLists[k]), nil
}, &timeNowMock{}, false)
// We expect to cache 3 items. So we're going to call PostingsForMatchers for 3 matchers
// and then double check they're all cached. To do it, we iterate twice.
for run := 0; run < 2; run++ {
for i := 0; i < 3; i++ {
matchers := matchersLists[i]
p, err := c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, matchers...)
require.NoError(t, err)
actual, err := index.ExpandPostings(p)
require.NoError(t, err)
assert.Equal(t, refsLists[matchersKey(matchers)], actual)
}
}
// At this point we expect that the postings have been computed only once for the 3 matchers.
for i := 0; i < 3; i++ {
assert.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[i])])
}
// Call PostingsForMatchers() for a 4th matcher. We expect this will evict the oldest cached entry.
for run := 0; run < 2; run++ {
matchers := matchersLists[3]
p, err := c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, matchers...)
require.NoError(t, err)
actual, err := index.ExpandPostings(p)
require.NoError(t, err)
assert.Equal(t, refsLists[matchersKey(matchers)], actual)
}
// To ensure the 1st (oldest) entry was removed, we call PostingsForMatchers() again on that matchers.
_, err := c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, matchersLists[0]...)
require.NoError(t, err)
assert.Equal(t, 2, callsPerMatchers[matchersKey(matchersLists[0])])
assert.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[1])])
assert.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[2])])
assert.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[3])])
})
}
type indexForPostingsMock struct{}

View file

@ -313,7 +313,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
seriesHashCache := hashcache.NewSeriesHashCache(1024 * 1024 * 1024)
blockdir := createBlockFromHead(b, tmpdir, h)
block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"), defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false)
block, err := OpenBlockWithOptions(nil, blockdir, nil, seriesHashCache.GetBlockCacheProvider("test"), DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce)
require.NoError(b, err)
defer func() {
require.NoError(b, block.Close())