package tsdb import ( "container/list" "strings" "sync" "time" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/index" ) const ( defaultPostingsForMatchersCacheTTL = 10 * time.Second defaultPostingsForMatchersCacheSize = 100 ) // IndexPostingsReader is a subset of IndexReader methods, the minimum required to evaluate PostingsForMatchers type IndexPostingsReader interface { // LabelValues returns possible label values which may not be sorted. LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) // Postings returns the postings list iterator for the label pairs. // The Postings here contain the offsets to the series inside the index. // Found IDs are not strictly required to point to a valid Series, e.g. // during background garbage collections. Input values must be sorted. Postings(name string, values ...string) (index.Postings, error) } // NewPostingsForMatchersCache creates a new PostingsForMatchersCache. func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int) *PostingsForMatchersCache { b := &PostingsForMatchersCache{ calls: &sync.Map{}, cached: list.New(), ttl: ttl, cacheSize: cacheSize, timeNow: time.Now, postingsForMatchers: PostingsForMatchers, } return b } // PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in. type PostingsForMatchersCache struct { calls *sync.Map cachedMtx sync.RWMutex cached *list.List ttl time.Duration cacheSize int // timeNow is the time.Now that can be replaced for testing purposes timeNow func() time.Time // postingsForMatchers can be replaced for testing purposes postingsForMatchers func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) } func (c *PostingsForMatchersCache) PostingsForMatchers(ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { if !concurrent { return c.postingsForMatchers(ix, ms...) } c.expire() return c.postingsForMatchersPromise(ix, ms)() } func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsReader, ms []*labels.Matcher) func() (index.Postings, error) { var ( wg sync.WaitGroup cloner *index.PostingsCloner outerErr error ) wg.Add(1) promise := func() (index.Postings, error) { wg.Wait() if outerErr != nil { return nil, outerErr } return cloner.Clone(), nil } key := matchersKey(ms) oldPromise, loaded := c.calls.LoadOrStore(key, promise) if loaded { return oldPromise.(func() (index.Postings, error)) } defer wg.Done() if postings, err := c.postingsForMatchers(ix, ms...); err != nil { outerErr = err } else { cloner = index.NewPostingsCloner(postings) } c.created(key, c.timeNow()) return promise } type postingsForMatchersCachedCall struct { key string ts time.Time } func (c *PostingsForMatchersCache) expire() { if c.ttl <= 0 { return } c.cachedMtx.RLock() if !c.shouldEvictHead() { c.cachedMtx.RUnlock() return } c.cachedMtx.RUnlock() c.cachedMtx.Lock() defer c.cachedMtx.Unlock() for c.shouldEvictHead() { c.evictHead() } } // shouldEvictHead returns true if cache head should be evicted, either because it's too old, // or because the cache has too many elements // should be called while read lock is held on cachedMtx func (c *PostingsForMatchersCache) shouldEvictHead() bool { if c.cached.Len() > c.cacheSize { return true } h := c.cached.Front() if h == nil { return false } ts := h.Value.(*postingsForMatchersCachedCall).ts return c.timeNow().Sub(ts) >= c.ttl } func (c *PostingsForMatchersCache) evictHead() { front := c.cached.Front() oldest := front.Value.(*postingsForMatchersCachedCall) c.calls.Delete(oldest.key) c.cached.Remove(front) } // created has to be called when returning from the PostingsForMatchers call that creates the promise. // the ts provided should be the call time. func (c *PostingsForMatchersCache) created(key string, ts time.Time) { if c.ttl <= 0 { c.calls.Delete(key) return } c.cachedMtx.Lock() defer c.cachedMtx.Unlock() c.cached.PushBack(&postingsForMatchersCachedCall{ key: key, ts: ts, }) } // matchersKey provides a unique string key for the given matchers slice // NOTE: different orders of matchers will produce different keys, // but it's unlikely that we'll receive same matchers in different orders at the same time func matchersKey(ms []*labels.Matcher) string { const ( typeLen = 2 sepLen = 1 ) var size int for _, m := range ms { size += len(m.Name) + len(m.Value) + typeLen + sepLen } sb := strings.Builder{} sb.Grow(size) for _, m := range ms { sb.WriteString(m.Name) sb.WriteString(m.Type.String()) sb.WriteString(m.Value) sb.WriteByte(0) } key := sb.String() return key } // indexReaderWithPostingsForMatchers adapts an index.Reader to be an IndexReader by adding the PostingsForMatchers method type indexReaderWithPostingsForMatchers struct { *index.Reader pfmc *PostingsForMatchersCache } func (ir indexReaderWithPostingsForMatchers) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { return ir.pfmc.PostingsForMatchers(ir, concurrent, ms...) } var _ IndexReader = indexReaderWithPostingsForMatchers{}