Merge pull request #553 from grafana/arve/postings-for-matchers-cache-tracing

PostingsForMatchersCache: Add tracing
This commit is contained in:
Arve Knudsen 2023-11-10 17:09:19 +01:00 committed by GitHub
commit 152aea3bc6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 8 deletions

View file

@ -8,6 +8,10 @@ import (
"time"
"github.com/DmitriyVTitov/size"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/index"
@ -49,6 +53,10 @@ func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64
timeNow: time.Now,
postingsForMatchers: PostingsForMatchers,
tracer: otel.Tracer(""),
ttlAttrib: attribute.Stringer("ttl", ttl),
forceAttrib: attribute.Bool("force", force),
}
return b
@ -71,14 +79,39 @@ type PostingsForMatchersCache struct {
timeNow func() time.Time
// postingsForMatchers can be replaced for testing purposes
postingsForMatchers func(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error)
tracer trace.Tracer
// Preallocated for performance
ttlAttrib attribute.KeyValue
forceAttrib attribute.KeyValue
}
func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
ctx, span := c.tracer.Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes(
attribute.Bool("concurrent", concurrent),
c.ttlAttrib,
c.forceAttrib,
))
defer span.End()
if !concurrent && !c.force {
return c.postingsForMatchers(ctx, ix, ms...)
span.AddEvent("cache not used")
p, err := c.postingsForMatchers(ctx, ix, ms...)
if err != nil {
span.SetStatus(codes.Error, "getting postings for matchers without cache failed")
span.RecordError(err)
}
return p, err
}
span.AddEvent("using cache")
c.expire()
return c.postingsForMatchersPromise(ix, ms)(ctx)
p, err := c.postingsForMatchersPromise(ctx, ix, ms)(ctx)
if err != nil {
span.SetStatus(codes.Error, "getting postings for matchers with cache failed")
span.RecordError(err)
}
return p, err
}
type postingsForMatcherPromise struct {
@ -89,34 +122,54 @@ type postingsForMatcherPromise struct {
}
func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) {
span := trace.SpanFromContext(ctx)
select {
case <-ctx.Done():
span.AddEvent("interrupting wait on postingsForMatchers promise due to context error", trace.WithAttributes(
attribute.String("err", ctx.Err().Error()),
))
return nil, ctx.Err()
case <-p.done:
// Checking context error is necessary for deterministic tests,
// as channel selection order is random
if ctx.Err() != nil {
span.AddEvent("completed postingsForMatchers promise, but context has error", trace.WithAttributes(
attribute.String("err", ctx.Err().Error()),
))
return nil, ctx.Err()
}
if p.err != nil {
span.AddEvent("postingsForMatchers promise completed with error", trace.WithAttributes(
attribute.String("err", p.err.Error()),
))
return nil, p.err
}
span.AddEvent("postingsForMatchers promise completed successfully")
return p.cloner.Clone(), nil
}
}
func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) {
promise := new(postingsForMatcherPromise)
promise.done = make(chan struct{})
func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) {
span := trace.SpanFromContext(ctx)
promise := &postingsForMatcherPromise{
done: make(chan struct{}),
}
key := matchersKey(ms)
oldPromise, loaded := c.calls.LoadOrStore(key, promise)
if loaded {
// promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine
span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes(
attribute.String("cache_key", key),
))
close(promise.done)
return oldPromise.(*postingsForMatcherPromise).result
}
span.AddEvent("no postingsForMatchers promise in cache, executing query")
// promise was stored, close its channel after fulfilment
defer close(promise.done)
@ -125,14 +178,21 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsRe
// FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have
// cancelled their context?
if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil {
span.AddEvent("postingsForMatchers failed", trace.WithAttributes(
attribute.String("cache_key", key),
attribute.String("err", err.Error()),
))
promise.err = err
} else {
span.AddEvent("postingsForMatchers succeeded", trace.WithAttributes(
attribute.String("cache_key", key),
))
promise.cloner = index.NewPostingsCloner(postings)
}
sizeBytes := int64(len(key) + size.Of(promise))
c.created(key, c.timeNow(), sizeBytes)
c.created(ctx, key, c.timeNow(), sizeBytes)
return promise.result
}
@ -191,8 +251,11 @@ func (c *PostingsForMatchersCache) evictHead() {
// created has to be called when returning from the PostingsForMatchers call that creates the promise.
// the ts provided should be the call time.
func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes int64) {
func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) {
span := trace.SpanFromContext(ctx)
if c.ttl <= 0 {
span.AddEvent("deleting cached promise since c.ttl <= 0")
c.calls.Delete(key)
return
}
@ -206,6 +269,11 @@ func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes i
sizeBytes: sizeBytes,
})
c.cachedBytes += sizeBytes
span.AddEvent("added cached value to expiry queue", trace.WithAttributes(
attribute.Stringer("timestamp", ts),
attribute.Int64("size in bytes", sizeBytes),
attribute.Int64("cached bytes", c.cachedBytes),
))
}
// matchersKey provides a unique string key for the given matchers slice.

View file

@ -364,7 +364,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
func BenchmarkPostingsForMatchersCache(b *testing.B) {
const (
numMatchers = 100
numPostings = 100
numPostings = 1000
)
var (