mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
Refactor PostingsForMatcherCache promise
Extract promise payload as a struct, to make size calculation easier. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
parent
241a342b33
commit
5fdf784243
|
@ -81,41 +81,43 @@ func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix I
|
|||
return c.postingsForMatchersPromise(ctx, ix, ms)()
|
||||
}
|
||||
|
||||
func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func() (index.Postings, error) {
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
cloner *index.PostingsCloner
|
||||
outerErr error
|
||||
)
|
||||
wg.Add(1)
|
||||
type postingsForMatcherPromise struct {
|
||||
sync.WaitGroup
|
||||
|
||||
promise := func() (index.Postings, error) {
|
||||
wg.Wait()
|
||||
if outerErr != nil {
|
||||
return nil, outerErr
|
||||
}
|
||||
return cloner.Clone(), nil
|
||||
cloner *index.PostingsCloner
|
||||
err error
|
||||
}
|
||||
|
||||
func (p *postingsForMatcherPromise) result() (index.Postings, error) {
|
||||
p.Wait()
|
||||
if p.err != nil {
|
||||
return nil, p.err
|
||||
}
|
||||
return p.cloner.Clone(), nil
|
||||
}
|
||||
|
||||
func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func() (index.Postings, error) {
|
||||
promise := new(postingsForMatcherPromise)
|
||||
promise.Add(1)
|
||||
|
||||
key := matchersKey(ms)
|
||||
oldPromise, loaded := c.calls.LoadOrStore(key, promise)
|
||||
if loaded {
|
||||
return oldPromise.(func() (index.Postings, error))
|
||||
promise = oldPromise.(*postingsForMatcherPromise)
|
||||
return promise.result
|
||||
}
|
||||
defer wg.Done()
|
||||
defer promise.Done()
|
||||
|
||||
if postings, err := c.postingsForMatchers(ctx, ix, ms...); err != nil {
|
||||
outerErr = err
|
||||
promise.err = err
|
||||
} else {
|
||||
cloner = index.NewPostingsCloner(postings)
|
||||
promise.cloner = index.NewPostingsCloner(postings)
|
||||
}
|
||||
|
||||
// Estimate the size of the cache entry, in bytes. We use max() because
|
||||
// size.Of() returns -1 if the value is nil.
|
||||
sizeBytes := int64(len(key)) + max(0, int64(size.Of(outerErr))) + max(0, int64(size.Of(cloner)))
|
||||
sizeBytes := int64(len(key) + size.Of(promise))
|
||||
|
||||
c.created(key, c.timeNow(), sizeBytes)
|
||||
return promise
|
||||
return promise.result
|
||||
}
|
||||
|
||||
type postingsForMatchersCachedCall struct {
|
||||
|
|
|
@ -267,7 +267,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
|||
t.Run("cached value is evicted because cache exceeds max bytes", func(t *testing.T) {
|
||||
const (
|
||||
maxItems = 100 // Never hit it.
|
||||
maxBytes = 1000
|
||||
maxBytes = 1100
|
||||
numMatchers = 5
|
||||
postingsListSize = 30 // 8 bytes per posting ref, so 30 x 8 = 240 bytes.
|
||||
)
|
||||
|
@ -313,7 +313,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
|||
|
||||
// At this point we expect that the postings have been computed only once for the 3 matchers.
|
||||
for i := 0; i < 3; i++ {
|
||||
assert.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[i])])
|
||||
assert.Equalf(t, 1, callsPerMatchers[matchersKey(matchersLists[i])], "matcher %d", i)
|
||||
}
|
||||
|
||||
// Call PostingsForMatchers() for a 4th matcher. We expect this will evict the oldest cached entry.
|
||||
|
|
Loading…
Reference in a new issue