diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 87e0af0cf4..bb4aba661e 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -8,6 +8,10 @@ import ( "time" "github.com/DmitriyVTitov/size" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/index" @@ -49,6 +53,10 @@ func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64 timeNow: time.Now, postingsForMatchers: PostingsForMatchers, + + tracer: otel.Tracer(""), + ttlAttrib: attribute.Stringer("ttl", ttl), + forceAttrib: attribute.Bool("force", force), } return b @@ -71,14 +79,39 @@ type PostingsForMatchersCache struct { timeNow func() time.Time // postingsForMatchers can be replaced for testing purposes postingsForMatchers func(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) + + tracer trace.Tracer + // Preallocated for performance + ttlAttrib attribute.KeyValue + forceAttrib attribute.KeyValue } func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { + ctx, span := c.tracer.Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes( + attribute.Bool("concurrent", concurrent), + c.ttlAttrib, + c.forceAttrib, + )) + defer span.End() + if !concurrent && !c.force { - return c.postingsForMatchers(ctx, ix, ms...) + span.AddEvent("cache not used") + p, err := c.postingsForMatchers(ctx, ix, ms...) + if err != nil { + span.SetStatus(codes.Error, "getting postings for matchers without cache failed") + span.RecordError(err) + } + return p, err } + + span.AddEvent("using cache") c.expire() - return c.postingsForMatchersPromise(ix, ms)(ctx) + p, err := c.postingsForMatchersPromise(ctx, ix, ms)(ctx) + if err != nil { + span.SetStatus(codes.Error, "getting postings for matchers with cache failed") + span.RecordError(err) + } + return p, err } type postingsForMatcherPromise struct { @@ -89,34 +122,54 @@ type postingsForMatcherPromise struct { } func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) { + span := trace.SpanFromContext(ctx) + select { case <-ctx.Done(): + span.AddEvent("interrupting wait on postingsForMatchers promise due to context error", trace.WithAttributes( + attribute.String("err", ctx.Err().Error()), + )) return nil, ctx.Err() case <-p.done: // Checking context error is necessary for deterministic tests, // as channel selection order is random if ctx.Err() != nil { + span.AddEvent("completed postingsForMatchers promise, but context has error", trace.WithAttributes( + attribute.String("err", ctx.Err().Error()), + )) return nil, ctx.Err() } if p.err != nil { + span.AddEvent("postingsForMatchers promise completed with error", trace.WithAttributes( + attribute.String("err", p.err.Error()), + )) return nil, p.err } + span.AddEvent("postingsForMatchers promise completed successfully") return p.cloner.Clone(), nil } } -func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { - promise := new(postingsForMatcherPromise) - promise.done = make(chan struct{}) +func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { + span := trace.SpanFromContext(ctx) + + promise := &postingsForMatcherPromise{ + done: make(chan struct{}), + } key := matchersKey(ms) oldPromise, loaded := c.calls.LoadOrStore(key, promise) if loaded { // promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine + span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes( + attribute.String("cache_key", key), + )) close(promise.done) return oldPromise.(*postingsForMatcherPromise).result } + span.AddEvent("no postingsForMatchers promise in cache, executing query") + // promise was stored, close its channel after fulfilment defer close(promise.done) @@ -125,14 +178,21 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsRe // FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have // cancelled their context? if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil { + span.AddEvent("postingsForMatchers failed", trace.WithAttributes( + attribute.String("cache_key", key), + attribute.String("err", err.Error()), + )) promise.err = err } else { + span.AddEvent("postingsForMatchers succeeded", trace.WithAttributes( + attribute.String("cache_key", key), + )) promise.cloner = index.NewPostingsCloner(postings) } sizeBytes := int64(len(key) + size.Of(promise)) - c.created(key, c.timeNow(), sizeBytes) + c.created(ctx, key, c.timeNow(), sizeBytes) return promise.result } @@ -191,8 +251,11 @@ func (c *PostingsForMatchersCache) evictHead() { // created has to be called when returning from the PostingsForMatchers call that creates the promise. // the ts provided should be the call time. -func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes int64) { +func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) { + span := trace.SpanFromContext(ctx) + if c.ttl <= 0 { + span.AddEvent("deleting cached promise since c.ttl <= 0") c.calls.Delete(key) return } @@ -206,6 +269,11 @@ func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes i sizeBytes: sizeBytes, }) c.cachedBytes += sizeBytes + span.AddEvent("added cached value to expiry queue", trace.WithAttributes( + attribute.Stringer("timestamp", ts), + attribute.Int64("size in bytes", sizeBytes), + attribute.Int64("cached bytes", c.cachedBytes), + )) } // matchersKey provides a unique string key for the given matchers slice. diff --git a/tsdb/postings_for_matchers_cache_test.go b/tsdb/postings_for_matchers_cache_test.go index c8e5355fd1..96e9a7db4f 100644 --- a/tsdb/postings_for_matchers_cache_test.go +++ b/tsdb/postings_for_matchers_cache_test.go @@ -364,7 +364,7 @@ func TestPostingsForMatchersCache(t *testing.T) { func BenchmarkPostingsForMatchersCache(b *testing.B) { const ( numMatchers = 100 - numPostings = 100 + numPostings = 1000 ) var (