PostingsForMatchersCache: Add tracing

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen 2023-10-25 11:38:12 +02:00
parent 237a77b483
commit 8ece24ddb0

View file

@ -8,6 +8,9 @@ import (
"time" "time"
"github.com/DmitriyVTitov/size" "github.com/DmitriyVTitov/size"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/index"
@ -74,11 +77,27 @@ type PostingsForMatchersCache struct {
} }
func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
if !concurrent && !c.force { var matcherStrs []string
return c.postingsForMatchers(ctx, ix, ms...) for _, m := range ms {
matcherStrs = append(matcherStrs, m.String())
} }
c.expire() ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes(
return c.postingsForMatchersPromise(ix, ms)(ctx) attribute.StringSlice("matchers", matcherStrs),
attribute.Bool("concurrent", concurrent),
attribute.Bool("force", c.force),
))
defer span.End()
if !concurrent && !c.force {
span.AddEvent("cache not in use")
p, err := c.postingsForMatchers(ctx, ix, ms...)
span.RecordError(err)
return p, err
}
c.expire(ctx)
p, err := c.postingsForMatchersPromise(ctx, ix, ms)(ctx)
span.RecordError(err)
return p, err
} }
type postingsForMatcherPromise struct { type postingsForMatcherPromise struct {
@ -89,23 +108,39 @@ type postingsForMatcherPromise struct {
} }
func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) { func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) {
ctx, span := otel.Tracer("").Start(ctx, "postingsForMatcherPromise.result")
defer span.End()
select { select {
case <-ctx.Done(): case <-ctx.Done():
span.AddEvent("interrupting wait on postingsForMatchers promise due to context error", trace.WithAttributes(
attribute.String("err", ctx.Err().Error()),
))
return nil, ctx.Err() return nil, ctx.Err()
case <-p.done: case <-p.done:
// Checking context error is necessary for deterministic tests, // Checking context error is necessary for deterministic tests,
// as channel selection order is random // as channel selection order is random
if ctx.Err() != nil { if ctx.Err() != nil {
span.AddEvent("successful postingsForMatchers promise, but context has error", trace.WithAttributes(
attribute.String("err", ctx.Err().Error()),
))
return nil, ctx.Err() return nil, ctx.Err()
} }
if p.err != nil { if p.err != nil {
span.AddEvent("failed postingsForMatchers promise ", trace.WithAttributes(
attribute.String("err", p.err.Error()),
))
return nil, p.err return nil, p.err
} }
span.AddEvent("successful postingsForMatchers promise")
return p.cloner.Clone(), nil return p.cloner.Clone(), nil
} }
} }
func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) {
ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.postingsForMatchersPromise")
defer span.End()
promise := new(postingsForMatcherPromise) promise := new(postingsForMatcherPromise)
promise.done = make(chan struct{}) promise.done = make(chan struct{})
@ -113,10 +148,17 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsRe
oldPromise, loaded := c.calls.LoadOrStore(key, promise) oldPromise, loaded := c.calls.LoadOrStore(key, promise)
if loaded { if loaded {
// promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine // promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine
span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes(
attribute.String("key", key),
))
close(promise.done) close(promise.done)
return oldPromise.(*postingsForMatcherPromise).result return oldPromise.(*postingsForMatcherPromise).result
} }
span.AddEvent("cached new postingsForMatchers promise, executing query", trace.WithAttributes(
attribute.String("key", key)),
)
// promise was stored, close its channel after fulfilment // promise was stored, close its channel after fulfilment
defer close(promise.done) defer close(promise.done)
@ -125,14 +167,18 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsRe
// FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have // FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have
// cancelled their context? // cancelled their context?
if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil { if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil {
span.AddEvent("postingsForMatchers failed", trace.WithAttributes(
attribute.String("err", err.Error()),
))
promise.err = err promise.err = err
} else { } else {
span.AddEvent("postingsForMatchers succeeded")
promise.cloner = index.NewPostingsCloner(postings) promise.cloner = index.NewPostingsCloner(postings)
} }
sizeBytes := int64(len(key) + size.Of(promise)) sizeBytes := int64(len(key) + size.Of(promise))
c.created(key, c.timeNow(), sizeBytes) c.created(ctx, key, c.timeNow(), sizeBytes)
return promise.result return promise.result
} }
@ -144,13 +190,20 @@ type postingsForMatchersCachedCall struct {
sizeBytes int64 sizeBytes int64
} }
func (c *PostingsForMatchersCache) expire() { func (c *PostingsForMatchersCache) expire(ctx context.Context) {
_, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.expire", trace.WithAttributes(
attribute.Stringer("ttl", c.ttl),
))
defer span.End()
if c.ttl <= 0 { if c.ttl <= 0 {
span.AddEvent("ttl < 0 - doing nothing")
return return
} }
c.cachedMtx.RLock() c.cachedMtx.RLock()
if !c.shouldEvictHead() { if !c.shouldEvictHead() {
span.AddEvent("should not evict head")
c.cachedMtx.RUnlock() c.cachedMtx.RUnlock()
return return
} }
@ -159,6 +212,7 @@ func (c *PostingsForMatchersCache) expire() {
c.cachedMtx.Lock() c.cachedMtx.Lock()
defer c.cachedMtx.Unlock() defer c.cachedMtx.Unlock()
span.AddEvent("evicting head(s)")
for c.shouldEvictHead() { for c.shouldEvictHead() {
c.evictHead() c.evictHead()
} }
@ -191,8 +245,15 @@ func (c *PostingsForMatchersCache) evictHead() {
// created has to be called when returning from the PostingsForMatchers call that creates the promise. // created has to be called when returning from the PostingsForMatchers call that creates the promise.
// the ts provided should be the call time. // the ts provided should be the call time.
func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes int64) { func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) {
_, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.created")
defer span.End()
if c.ttl <= 0 { if c.ttl <= 0 {
span.AddEvent("deleting cached promise since c.ttl <= 0", trace.WithAttributes(
attribute.Stringer("ttl", c.ttl),
attribute.String("key", key),
))
c.calls.Delete(key) c.calls.Delete(key)
return return
} }
@ -206,6 +267,13 @@ func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes i
sizeBytes: sizeBytes, sizeBytes: sizeBytes,
}) })
c.cachedBytes += sizeBytes c.cachedBytes += sizeBytes
span.AddEvent("recorded cached promise size", trace.WithAttributes(
attribute.Stringer("ttl", c.ttl),
attribute.String("key", key),
attribute.Stringer("timestamp", ts),
attribute.Int64("size in bytes", sizeBytes),
attribute.Int64("cached bytes", c.cachedBytes),
))
} }
// matchersKey provides a unique string key for the given matchers slice. // matchersKey provides a unique string key for the given matchers slice.