From 8ece24ddb004cccfe65327db1da3532cf9bb6efe Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 25 Oct 2023 11:38:12 +0200 Subject: [PATCH 01/15] PostingsForMatchersCache: Add tracing Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 84 ++++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 8 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 87e0af0cf4..4a0550ea6d 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -8,6 +8,9 @@ import ( "time" "github.com/DmitriyVTitov/size" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/index" @@ -74,11 +77,27 @@ type PostingsForMatchersCache struct { } func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { - if !concurrent && !c.force { - return c.postingsForMatchers(ctx, ix, ms...) + var matcherStrs []string + for _, m := range ms { + matcherStrs = append(matcherStrs, m.String()) } - c.expire() - return c.postingsForMatchersPromise(ix, ms)(ctx) + ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes( + attribute.StringSlice("matchers", matcherStrs), + attribute.Bool("concurrent", concurrent), + attribute.Bool("force", c.force), + )) + defer span.End() + + if !concurrent && !c.force { + span.AddEvent("cache not in use") + p, err := c.postingsForMatchers(ctx, ix, ms...) + span.RecordError(err) + return p, err + } + c.expire(ctx) + p, err := c.postingsForMatchersPromise(ctx, ix, ms)(ctx) + span.RecordError(err) + return p, err } type postingsForMatcherPromise struct { @@ -89,23 +108,39 @@ type postingsForMatcherPromise struct { } func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) { + ctx, span := otel.Tracer("").Start(ctx, "postingsForMatcherPromise.result") + defer span.End() + select { case <-ctx.Done(): + span.AddEvent("interrupting wait on postingsForMatchers promise due to context error", trace.WithAttributes( + attribute.String("err", ctx.Err().Error()), + )) return nil, ctx.Err() case <-p.done: // Checking context error is necessary for deterministic tests, // as channel selection order is random if ctx.Err() != nil { + span.AddEvent("successful postingsForMatchers promise, but context has error", trace.WithAttributes( + attribute.String("err", ctx.Err().Error()), + )) return nil, ctx.Err() } if p.err != nil { + span.AddEvent("failed postingsForMatchers promise ", trace.WithAttributes( + attribute.String("err", p.err.Error()), + )) return nil, p.err } + span.AddEvent("successful postingsForMatchers promise") return p.cloner.Clone(), nil } } -func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { +func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { + ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.postingsForMatchersPromise") + defer span.End() + promise := new(postingsForMatcherPromise) promise.done = make(chan struct{}) @@ -113,10 +148,17 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsRe oldPromise, loaded := c.calls.LoadOrStore(key, promise) if loaded { // promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine + span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes( + attribute.String("key", key), + )) close(promise.done) return oldPromise.(*postingsForMatcherPromise).result } + span.AddEvent("cached new postingsForMatchers promise, executing query", trace.WithAttributes( + attribute.String("key", key)), + ) + // promise was stored, close its channel after fulfilment defer close(promise.done) @@ -125,14 +167,18 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsRe // FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have // cancelled their context? if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil { + span.AddEvent("postingsForMatchers failed", trace.WithAttributes( + attribute.String("err", err.Error()), + )) promise.err = err } else { + span.AddEvent("postingsForMatchers succeeded") promise.cloner = index.NewPostingsCloner(postings) } sizeBytes := int64(len(key) + size.Of(promise)) - c.created(key, c.timeNow(), sizeBytes) + c.created(ctx, key, c.timeNow(), sizeBytes) return promise.result } @@ -144,13 +190,20 @@ type postingsForMatchersCachedCall struct { sizeBytes int64 } -func (c *PostingsForMatchersCache) expire() { +func (c *PostingsForMatchersCache) expire(ctx context.Context) { + _, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.expire", trace.WithAttributes( + attribute.Stringer("ttl", c.ttl), + )) + defer span.End() + if c.ttl <= 0 { + span.AddEvent("ttl < 0 - doing nothing") return } c.cachedMtx.RLock() if !c.shouldEvictHead() { + span.AddEvent("should not evict head") c.cachedMtx.RUnlock() return } @@ -159,6 +212,7 @@ func (c *PostingsForMatchersCache) expire() { c.cachedMtx.Lock() defer c.cachedMtx.Unlock() + span.AddEvent("evicting head(s)") for c.shouldEvictHead() { c.evictHead() } @@ -191,8 +245,15 @@ func (c *PostingsForMatchersCache) evictHead() { // created has to be called when returning from the PostingsForMatchers call that creates the promise. // the ts provided should be the call time. -func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes int64) { +func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) { + _, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.created") + defer span.End() + if c.ttl <= 0 { + span.AddEvent("deleting cached promise since c.ttl <= 0", trace.WithAttributes( + attribute.Stringer("ttl", c.ttl), + attribute.String("key", key), + )) c.calls.Delete(key) return } @@ -206,6 +267,13 @@ func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes i sizeBytes: sizeBytes, }) c.cachedBytes += sizeBytes + span.AddEvent("recorded cached promise size", trace.WithAttributes( + attribute.Stringer("ttl", c.ttl), + attribute.String("key", key), + attribute.Stringer("timestamp", ts), + attribute.Int64("size in bytes", sizeBytes), + attribute.Int64("cached bytes", c.cachedBytes), + )) } // matchersKey provides a unique string key for the given matchers slice. From ca4cdeb1afb093b31319296b73dd46985c1c0824 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 26 Oct 2023 08:37:07 +0200 Subject: [PATCH 02/15] Apply suggestions from code review Co-authored-by: Charles Korn --- tsdb/postings_for_matchers_cache.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 4a0550ea6d..b8c46e31d0 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -89,7 +89,7 @@ func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix I defer span.End() if !concurrent && !c.force { - span.AddEvent("cache not in use") + span.AddEvent("cache not used") p, err := c.postingsForMatchers(ctx, ix, ms...) span.RecordError(err) return p, err @@ -127,7 +127,7 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, return nil, ctx.Err() } if p.err != nil { - span.AddEvent("failed postingsForMatchers promise ", trace.WithAttributes( + span.AddEvent("failed postingsForMatchers promise", trace.WithAttributes( attribute.String("err", p.err.Error()), )) return nil, p.err @@ -155,7 +155,7 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex return oldPromise.(*postingsForMatcherPromise).result } - span.AddEvent("cached new postingsForMatchers promise, executing query", trace.WithAttributes( + span.AddEvent("no postingsForMatchers promise in cache, executing query", trace.WithAttributes( attribute.String("key", key)), ) From 6c0910a28947744c67564ebc587ef84d4e76b652 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 30 Oct 2023 13:57:42 +0100 Subject: [PATCH 03/15] Update tsdb/postings_for_matchers_cache.go Co-authored-by: Charles Korn --- tsdb/postings_for_matchers_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index b8c46e31d0..b73f512e6c 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -267,7 +267,7 @@ func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts t sizeBytes: sizeBytes, }) c.cachedBytes += sizeBytes - span.AddEvent("recorded cached promise size", trace.WithAttributes( + span.AddEvent("added cached value to expiry queue", trace.WithAttributes( attribute.Stringer("ttl", c.ttl), attribute.String("key", key), attribute.Stringer("timestamp", ts), From 25e2c05a4754e4ebf87d19ff5b735c30b9d701d0 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 30 Oct 2023 13:59:43 +0100 Subject: [PATCH 04/15] Implement review feedback Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index b73f512e6c..b5bdb75dcc 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -94,7 +94,7 @@ func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix I span.RecordError(err) return p, err } - c.expire(ctx) + c.expire() p, err := c.postingsForMatchersPromise(ctx, ix, ms)(ctx) span.RecordError(err) return p, err @@ -190,20 +190,13 @@ type postingsForMatchersCachedCall struct { sizeBytes int64 } -func (c *PostingsForMatchersCache) expire(ctx context.Context) { - _, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.expire", trace.WithAttributes( - attribute.Stringer("ttl", c.ttl), - )) - defer span.End() - +func (c *PostingsForMatchersCache) expire() { if c.ttl <= 0 { - span.AddEvent("ttl < 0 - doing nothing") return } c.cachedMtx.RLock() if !c.shouldEvictHead() { - span.AddEvent("should not evict head") c.cachedMtx.RUnlock() return } @@ -212,7 +205,6 @@ func (c *PostingsForMatchersCache) expire(ctx context.Context) { c.cachedMtx.Lock() defer c.cachedMtx.Unlock() - span.AddEvent("evicting head(s)") for c.shouldEvictHead() { c.evictHead() } @@ -246,8 +238,7 @@ func (c *PostingsForMatchersCache) evictHead() { // created has to be called when returning from the PostingsForMatchers call that creates the promise. // the ts provided should be the call time. func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) { - _, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.created") - defer span.End() + span := trace.SpanFromContext(ctx) if c.ttl <= 0 { span.AddEvent("deleting cached promise since c.ttl <= 0", trace.WithAttributes( From 2aabf7cc2c1ef63948fa127e975647f3699b37c3 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 30 Oct 2023 14:30:12 +0100 Subject: [PATCH 05/15] Improve tracing Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index b5bdb75dcc..725a550a85 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -77,12 +77,7 @@ type PostingsForMatchersCache struct { } func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { - var matcherStrs []string - for _, m := range ms { - matcherStrs = append(matcherStrs, m.String()) - } ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes( - attribute.StringSlice("matchers", matcherStrs), attribute.Bool("concurrent", concurrent), attribute.Bool("force", c.force), )) @@ -94,6 +89,8 @@ func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix I span.RecordError(err) return p, err } + + span.AddEvent("using cache") c.expire() p, err := c.postingsForMatchersPromise(ctx, ix, ms)(ctx) span.RecordError(err) @@ -138,26 +135,24 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, } func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { - ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.postingsForMatchersPromise") + key := matchersKey(ms) + ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.postingsForMatchersPromise", trace.WithAttributes( + attribute.String("cache_key", key), + )) defer span.End() promise := new(postingsForMatcherPromise) promise.done = make(chan struct{}) - key := matchersKey(ms) oldPromise, loaded := c.calls.LoadOrStore(key, promise) if loaded { // promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine - span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes( - attribute.String("key", key), - )) + span.AddEvent("using cached postingsForMatchers promise") close(promise.done) return oldPromise.(*postingsForMatcherPromise).result } - span.AddEvent("no postingsForMatchers promise in cache, executing query", trace.WithAttributes( - attribute.String("key", key)), - ) + span.AddEvent("no postingsForMatchers promise in cache, executing query") // promise was stored, close its channel after fulfilment defer close(promise.done) From 7c313be3b513c197d83141fb8b2f2ffccbc2c7bf Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 30 Oct 2023 14:53:32 +0100 Subject: [PATCH 06/15] Remove unnecessary event attributes Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 725a550a85..cc920d0084 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -238,7 +238,6 @@ func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts t if c.ttl <= 0 { span.AddEvent("deleting cached promise since c.ttl <= 0", trace.WithAttributes( attribute.Stringer("ttl", c.ttl), - attribute.String("key", key), )) c.calls.Delete(key) return @@ -255,7 +254,6 @@ func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts t c.cachedBytes += sizeBytes span.AddEvent("added cached value to expiry queue", trace.WithAttributes( attribute.Stringer("ttl", c.ttl), - attribute.String("key", key), attribute.Stringer("timestamp", ts), attribute.Int64("size in bytes", sizeBytes), attribute.Int64("cached bytes", c.cachedBytes), From 403abaee79a3e921aee288687b333a3524596f4f Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 31 Oct 2023 08:36:10 +0100 Subject: [PATCH 07/15] Set span status on error Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index cc920d0084..c9046eee46 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -10,6 +10,7 @@ import ( "github.com/DmitriyVTitov/size" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "github.com/prometheus/prometheus/model/labels" @@ -86,14 +87,20 @@ func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix I if !concurrent && !c.force { span.AddEvent("cache not used") p, err := c.postingsForMatchers(ctx, ix, ms...) - span.RecordError(err) + if err != nil { + span.SetStatus(codes.Error, "getting postings for matchers without cache failed") + span.RecordError(err) + } return p, err } span.AddEvent("using cache") c.expire() p, err := c.postingsForMatchersPromise(ctx, ix, ms)(ctx) - span.RecordError(err) + if err != nil { + span.SetStatus(codes.Error, "getting postings for matchers with cache failed") + span.RecordError(err) + } return p, err } From 46bf26e841d5460329a0f6900be32a25143fe312 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 31 Oct 2023 14:48:36 +0100 Subject: [PATCH 08/15] Create Tracer once per PostingsForMatchersCache Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index c9046eee46..90bd941121 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -53,6 +53,8 @@ func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64 timeNow: time.Now, postingsForMatchers: PostingsForMatchers, + + tracer: otel.Tracer(""), } return b @@ -75,10 +77,12 @@ type PostingsForMatchersCache struct { timeNow func() time.Time // postingsForMatchers can be replaced for testing purposes postingsForMatchers func(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) + + tracer trace.Tracer } func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { - ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes( + ctx, span := c.tracer.Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes( attribute.Bool("concurrent", concurrent), attribute.Bool("force", c.force), )) @@ -109,10 +113,11 @@ type postingsForMatcherPromise struct { cloner *index.PostingsCloner err error + tracer trace.Tracer } func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) { - ctx, span := otel.Tracer("").Start(ctx, "postingsForMatcherPromise.result") + ctx, span := p.tracer.Start(ctx, "postingsForMatcherPromise.result") defer span.End() select { @@ -143,13 +148,15 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { key := matchersKey(ms) - ctx, span := otel.Tracer("").Start(ctx, "PostingsForMatchersCache.postingsForMatchersPromise", trace.WithAttributes( + ctx, span := c.tracer.Start(ctx, "PostingsForMatchersCache.postingsForMatchersPromise", trace.WithAttributes( attribute.String("cache_key", key), )) defer span.End() - promise := new(postingsForMatcherPromise) - promise.done = make(chan struct{}) + promise := &postingsForMatcherPromise{ + done: make(chan struct{}), + tracer: c.tracer, + } oldPromise, loaded := c.calls.LoadOrStore(key, promise) if loaded { From ef70a6e6cf1492b690051a64c9948112eb2bfefe Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 1 Nov 2023 11:53:10 +0100 Subject: [PATCH 09/15] Allocate tracing attributes up front Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 38 ++++++++++++++++------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 90bd941121..52b67f9233 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -46,10 +46,11 @@ func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64 calls: &sync.Map{}, cached: list.New(), - ttl: ttl, - maxItems: maxItems, - maxBytes: maxBytes, - force: force, + ttl: ttl, + ttlAttrib: attribute.Stringer("ttl", ttl), + maxItems: maxItems, + maxBytes: maxBytes, + force: force, timeNow: time.Now, postingsForMatchers: PostingsForMatchers, @@ -68,10 +69,11 @@ type PostingsForMatchersCache struct { cached *list.List cachedBytes int64 - ttl time.Duration - maxItems int - maxBytes int64 - force bool + ttl time.Duration + ttlAttrib attribute.KeyValue + maxItems int + maxBytes int64 + force bool // timeNow is the time.Now that can be replaced for testing purposes timeNow func() time.Time @@ -117,8 +119,7 @@ type postingsForMatcherPromise struct { } func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) { - ctx, span := p.tracer.Start(ctx, "postingsForMatcherPromise.result") - defer span.End() + span := trace.SpanFromContext(ctx) select { case <-ctx.Done(): @@ -148,9 +149,7 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { key := matchersKey(ms) - ctx, span := c.tracer.Start(ctx, "PostingsForMatchersCache.postingsForMatchersPromise", trace.WithAttributes( - attribute.String("cache_key", key), - )) + span := trace.SpanFromContext(ctx) defer span.End() promise := &postingsForMatcherPromise{ @@ -161,7 +160,9 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex oldPromise, loaded := c.calls.LoadOrStore(key, promise) if loaded { // promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine - span.AddEvent("using cached postingsForMatchers promise") + span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes( + attribute.String("cache_key", key), + )) close(promise.done) return oldPromise.(*postingsForMatcherPromise).result } @@ -177,11 +178,14 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex // cancelled their context? if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil { span.AddEvent("postingsForMatchers failed", trace.WithAttributes( + attribute.String("cache_key", key), attribute.String("err", err.Error()), )) promise.err = err } else { - span.AddEvent("postingsForMatchers succeeded") + span.AddEvent("postingsForMatchers succeeded", trace.WithAttributes( + attribute.String("cache_key", key), + )) promise.cloner = index.NewPostingsCloner(postings) } @@ -251,7 +255,7 @@ func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts t if c.ttl <= 0 { span.AddEvent("deleting cached promise since c.ttl <= 0", trace.WithAttributes( - attribute.Stringer("ttl", c.ttl), + c.ttlAttrib, )) c.calls.Delete(key) return @@ -267,7 +271,7 @@ func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts t }) c.cachedBytes += sizeBytes span.AddEvent("added cached value to expiry queue", trace.WithAttributes( - attribute.Stringer("ttl", c.ttl), + c.ttlAttrib, attribute.Stringer("timestamp", ts), attribute.Int64("size in bytes", sizeBytes), attribute.Int64("cached bytes", c.cachedBytes), From e3b8667fb15585d08807af8b02ceacd06235d04e Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 9 Nov 2023 15:42:10 +0100 Subject: [PATCH 10/15] Increase number of postings in benchmark Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/postings_for_matchers_cache_test.go b/tsdb/postings_for_matchers_cache_test.go index c8e5355fd1..b715ff6923 100644 --- a/tsdb/postings_for_matchers_cache_test.go +++ b/tsdb/postings_for_matchers_cache_test.go @@ -364,7 +364,7 @@ func TestPostingsForMatchersCache(t *testing.T) { func BenchmarkPostingsForMatchersCache(b *testing.B) { const ( numMatchers = 100 - numPostings = 100 + numPostings = 100000 ) var ( From 83b0ed7bdbf09f57ba017e00d9b46d3bc2c5d680 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 9 Nov 2023 16:26:35 +0100 Subject: [PATCH 11/15] Clean up Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 37 ++++++++++++++--------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 52b67f9233..ae877d97d6 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -46,16 +46,17 @@ func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64 calls: &sync.Map{}, cached: list.New(), - ttl: ttl, - ttlAttrib: attribute.Stringer("ttl", ttl), - maxItems: maxItems, - maxBytes: maxBytes, - force: force, + ttl: ttl, + maxItems: maxItems, + maxBytes: maxBytes, + force: force, timeNow: time.Now, postingsForMatchers: PostingsForMatchers, - tracer: otel.Tracer(""), + tracer: otel.Tracer(""), + ttlAttrib: attribute.Stringer("ttl", ttl), + forceAttrib: attribute.Bool("force", force), } return b @@ -69,11 +70,10 @@ type PostingsForMatchersCache struct { cached *list.List cachedBytes int64 - ttl time.Duration - ttlAttrib attribute.KeyValue - maxItems int - maxBytes int64 - force bool + ttl time.Duration + maxItems int + maxBytes int64 + force bool // timeNow is the time.Now that can be replaced for testing purposes timeNow func() time.Time @@ -81,12 +81,16 @@ type PostingsForMatchersCache struct { postingsForMatchers func(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) tracer trace.Tracer + // Preallocated for performance + ttlAttrib attribute.KeyValue + forceAttrib attribute.KeyValue } func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { ctx, span := c.tracer.Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes( attribute.Bool("concurrent", concurrent), - attribute.Bool("force", c.force), + c.ttlAttrib, + c.forceAttrib, )) defer span.End() @@ -115,7 +119,6 @@ type postingsForMatcherPromise struct { cloner *index.PostingsCloner err error - tracer trace.Tracer } func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) { @@ -153,8 +156,7 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex defer span.End() promise := &postingsForMatcherPromise{ - done: make(chan struct{}), - tracer: c.tracer, + done: make(chan struct{}), } oldPromise, loaded := c.calls.LoadOrStore(key, promise) @@ -254,9 +256,7 @@ func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts t span := trace.SpanFromContext(ctx) if c.ttl <= 0 { - span.AddEvent("deleting cached promise since c.ttl <= 0", trace.WithAttributes( - c.ttlAttrib, - )) + span.AddEvent("deleting cached promise since c.ttl <= 0") c.calls.Delete(key) return } @@ -271,7 +271,6 @@ func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts t }) c.cachedBytes += sizeBytes span.AddEvent("added cached value to expiry queue", trace.WithAttributes( - c.ttlAttrib, attribute.Stringer("timestamp", ts), attribute.Int64("size in bytes", sizeBytes), attribute.Int64("cached bytes", c.cachedBytes), From 476ebcb0a6b668840c3890a12c46a5135f3452fa Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 9 Nov 2023 16:54:03 +0100 Subject: [PATCH 12/15] Adjust benchmark Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/postings_for_matchers_cache_test.go b/tsdb/postings_for_matchers_cache_test.go index b715ff6923..96e9a7db4f 100644 --- a/tsdb/postings_for_matchers_cache_test.go +++ b/tsdb/postings_for_matchers_cache_test.go @@ -364,7 +364,7 @@ func TestPostingsForMatchersCache(t *testing.T) { func BenchmarkPostingsForMatchersCache(b *testing.B) { const ( numMatchers = 100 - numPostings = 100000 + numPostings = 1000 ) var ( From 92606f3b522c0e13648e84070ccd4d252db234fb Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 9 Nov 2023 17:08:59 +0100 Subject: [PATCH 13/15] Fix trace event Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index ae877d97d6..f1546d59c5 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -134,7 +134,7 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, // Checking context error is necessary for deterministic tests, // as channel selection order is random if ctx.Err() != nil { - span.AddEvent("successful postingsForMatchers promise, but context has error", trace.WithAttributes( + span.AddEvent("completed postingsForMatchers promise, but context has error", trace.WithAttributes( attribute.String("err", ctx.Err().Error()), )) return nil, ctx.Err() @@ -151,7 +151,6 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, } func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { - key := matchersKey(ms) span := trace.SpanFromContext(ctx) defer span.End() @@ -159,6 +158,7 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex done: make(chan struct{}), } + key := matchersKey(ms) oldPromise, loaded := c.calls.LoadOrStore(key, promise) if loaded { // promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine From e1e64f36c22fd3f07448e22825f878703421bdfe Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 10 Nov 2023 08:02:13 +0100 Subject: [PATCH 14/15] Apply suggestions from code review Co-authored-by: Charles Korn --- tsdb/postings_for_matchers_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index f1546d59c5..71ca69d263 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -140,12 +140,12 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, return nil, ctx.Err() } if p.err != nil { - span.AddEvent("failed postingsForMatchers promise", trace.WithAttributes( + span.AddEvent("postingsForMatchers promise completed with error", trace.WithAttributes( attribute.String("err", p.err.Error()), )) return nil, p.err } - span.AddEvent("successful postingsForMatchers promise") + span.AddEvent("postingsForMatchers promise completed successfully") return p.cloner.Clone(), nil } } From d1c7983f22dc83433b94cb2a430e1aa05c7a10dc Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 9 Nov 2023 17:17:36 +0100 Subject: [PATCH 15/15] Don't end non-owned span Signed-off-by: Arve Knudsen --- tsdb/postings_for_matchers_cache.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 71ca69d263..bb4aba661e 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -152,7 +152,6 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { span := trace.SpanFromContext(ctx) - defer span.End() promise := &postingsForMatcherPromise{ done: make(chan struct{}),