From 8ece24ddb004cccfe65327db1da3532cf9bb6efe Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 25 Oct 2023 11:38:12 +0200 Subject: [PATCH] PostingsForMatchersCache: Add tracing Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 84 ++++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 8 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 87e0af0cf4..4a0550ea6d 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -8,6 +8,9 @@ import ( "time" "github.com/DmitriyVTitov/size" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/index" @@ -74,11 +77,27 @@ type PostingsForMatchersCache struct { } func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { - if !concurrent && !c.force { - return c.postingsForMatchers(ctx, ix, ms...) + var matcherStrs []string + for _, m := range ms { + matcherStrs = append(matcherStrs, m.String()) } - c.expire() - return c.postingsForMatchersPromise(ix, ms)(ctx) + ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes( + attribute.StringSlice("matchers", matcherStrs), + attribute.Bool("concurrent", concurrent), + attribute.Bool("force", c.force), + )) + defer span.End() + + if !concurrent && !c.force { + span.AddEvent("cache not in use") + p, err := c.postingsForMatchers(ctx, ix, ms...) + span.RecordError(err) + return p, err + } + c.expire(ctx) + p, err := c.postingsForMatchersPromise(ctx, ix, ms)(ctx) + span.RecordError(err) + return p, err } type postingsForMatcherPromise struct { @@ -89,23 +108,39 @@ type postingsForMatcherPromise struct { } func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) { + ctx, span := otel.Tracer("").Start(ctx, "postingsForMatcherPromise.result") + defer span.End() + select { case <-ctx.Done(): + span.AddEvent("interrupting wait on postingsForMatchers promise due to context error", trace.WithAttributes( + attribute.String("err", ctx.Err().Error()), + )) return nil, ctx.Err() case <-p.done: // Checking context error is necessary for deterministic tests, // as channel selection order is random if ctx.Err() != nil { + span.AddEvent("successful postingsForMatchers promise, but context has error", trace.WithAttributes( + attribute.String("err", ctx.Err().Error()), + )) return nil, ctx.Err() } if p.err != nil { + span.AddEvent("failed postingsForMatchers promise ", trace.WithAttributes( + attribute.String("err", p.err.Error()), + )) return nil, p.err } + span.AddEvent("successful postingsForMatchers promise") return p.cloner.Clone(), nil } } -func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { +func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { + ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.postingsForMatchersPromise") + defer span.End() + promise := new(postingsForMatcherPromise) promise.done = make(chan struct{}) @@ -113,10 +148,17 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsRe oldPromise, loaded := c.calls.LoadOrStore(key, promise) if loaded { // promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine + span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes( + attribute.String("key", key), + )) close(promise.done) return oldPromise.(*postingsForMatcherPromise).result } + span.AddEvent("cached new postingsForMatchers promise, executing query", trace.WithAttributes( + attribute.String("key", key)), + ) + // promise was stored, close its channel after fulfilment defer close(promise.done) @@ -125,14 +167,18 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsRe // FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have // cancelled their context? if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil { + span.AddEvent("postingsForMatchers failed", trace.WithAttributes( + attribute.String("err", err.Error()), + )) promise.err = err } else { + span.AddEvent("postingsForMatchers succeeded") promise.cloner = index.NewPostingsCloner(postings) } sizeBytes := int64(len(key) + size.Of(promise)) - c.created(key, c.timeNow(), sizeBytes) + c.created(ctx, key, c.timeNow(), sizeBytes) return promise.result } @@ -144,13 +190,20 @@ type postingsForMatchersCachedCall struct { sizeBytes int64 } -func (c *PostingsForMatchersCache) expire() { +func (c *PostingsForMatchersCache) expire(ctx context.Context) { + _, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.expire", trace.WithAttributes( + attribute.Stringer("ttl", c.ttl), + )) + defer span.End() + if c.ttl <= 0 { + span.AddEvent("ttl < 0 - doing nothing") return } c.cachedMtx.RLock() if !c.shouldEvictHead() { + span.AddEvent("should not evict head") c.cachedMtx.RUnlock() return } @@ -159,6 +212,7 @@ func (c *PostingsForMatchersCache) expire() { c.cachedMtx.Lock() defer c.cachedMtx.Unlock() + span.AddEvent("evicting head(s)") for c.shouldEvictHead() { c.evictHead() } @@ -191,8 +245,15 @@ func (c *PostingsForMatchersCache) evictHead() { // created has to be called when returning from the PostingsForMatchers call that creates the promise. // the ts provided should be the call time. -func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes int64) { +func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) { + _, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.created") + defer span.End() + if c.ttl <= 0 { + span.AddEvent("deleting cached promise since c.ttl <= 0", trace.WithAttributes( + attribute.Stringer("ttl", c.ttl), + attribute.String("key", key), + )) c.calls.Delete(key) return } @@ -206,6 +267,13 @@ func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes i sizeBytes: sizeBytes, }) c.cachedBytes += sizeBytes + span.AddEvent("recorded cached promise size", trace.WithAttributes( + attribute.Stringer("ttl", c.ttl), + attribute.String("key", key), + attribute.Stringer("timestamp", ts), + attribute.Int64("size in bytes", sizeBytes), + attribute.Int64("cached bytes", c.cachedBytes), + )) } // matchersKey provides a unique string key for the given matchers slice.