mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #575 from grafana/dimitar/postings-for-matchers-reduce-logging
PostingsForMatchersCache: don't create new spans, reduce logging verbosity
This commit is contained in:
commit
ab26771abc
|
@ -3,6 +3,7 @@ package tsdb
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -87,12 +88,13 @@ type PostingsForMatchersCache struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
|
func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
ctx, span := c.tracer.Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes(
|
span := trace.SpanFromContext(ctx)
|
||||||
attribute.Bool("concurrent", concurrent),
|
defer func(startTime time.Time) {
|
||||||
c.ttlAttrib,
|
span.AddEvent(
|
||||||
c.forceAttrib,
|
"PostingsForMatchers returned",
|
||||||
))
|
trace.WithAttributes(attribute.Bool("concurrent", concurrent), c.ttlAttrib, c.forceAttrib, attribute.Stringer("duration", time.Since(startTime))),
|
||||||
defer span.End()
|
)
|
||||||
|
}(time.Now())
|
||||||
|
|
||||||
if !concurrent && !c.force {
|
if !concurrent && !c.force {
|
||||||
span.AddEvent("cache not used")
|
span.AddEvent("cache not used")
|
||||||
|
@ -122,30 +124,18 @@ type postingsForMatcherPromise struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) {
|
func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) {
|
||||||
span := trace.SpanFromContext(ctx)
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
span.AddEvent("interrupting wait on postingsForMatchers promise due to context error", trace.WithAttributes(
|
return nil, fmt.Errorf("interrupting wait on postingsForMatchers promise due to context error: %w", ctx.Err())
|
||||||
attribute.String("err", ctx.Err().Error()),
|
|
||||||
))
|
|
||||||
return nil, ctx.Err()
|
|
||||||
case <-p.done:
|
case <-p.done:
|
||||||
// Checking context error is necessary for deterministic tests,
|
// Checking context error is necessary for deterministic tests,
|
||||||
// as channel selection order is random
|
// as channel selection order is random
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
span.AddEvent("completed postingsForMatchers promise, but context has error", trace.WithAttributes(
|
return nil, fmt.Errorf("completed postingsForMatchers promise, but context has error: %w", ctx.Err())
|
||||||
attribute.String("err", ctx.Err().Error()),
|
|
||||||
))
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
}
|
||||||
if p.err != nil {
|
if p.err != nil {
|
||||||
span.AddEvent("postingsForMatchers promise completed with error", trace.WithAttributes(
|
return nil, fmt.Errorf("postingsForMatchers promise completed with error: %w", p.err)
|
||||||
attribute.String("err", p.err.Error()),
|
|
||||||
))
|
|
||||||
return nil, p.err
|
|
||||||
}
|
}
|
||||||
span.AddEvent("postingsForMatchers promise completed successfully")
|
|
||||||
return p.cloner.Clone(), nil
|
return p.cloner.Clone(), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -168,7 +158,7 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex
|
||||||
return oldPromise.(*postingsForMatcherPromise).result
|
return oldPromise.(*postingsForMatcherPromise).result
|
||||||
}
|
}
|
||||||
|
|
||||||
span.AddEvent("no postingsForMatchers promise in cache, executing query")
|
span.AddEvent("no postingsForMatchers promise in cache, executing query", trace.WithAttributes(attribute.String("cache_key", key)))
|
||||||
|
|
||||||
// promise was stored, close its channel after fulfilment
|
// promise was stored, close its channel after fulfilment
|
||||||
defer close(promise.done)
|
defer close(promise.done)
|
||||||
|
@ -178,15 +168,8 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex
|
||||||
// FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have
|
// FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have
|
||||||
// cancelled their context?
|
// cancelled their context?
|
||||||
if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil {
|
if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil {
|
||||||
span.AddEvent("postingsForMatchers failed", trace.WithAttributes(
|
|
||||||
attribute.String("cache_key", key),
|
|
||||||
attribute.String("err", err.Error()),
|
|
||||||
))
|
|
||||||
promise.err = err
|
promise.err = err
|
||||||
} else {
|
} else {
|
||||||
span.AddEvent("postingsForMatchers succeeded", trace.WithAttributes(
|
|
||||||
attribute.String("cache_key", key),
|
|
||||||
))
|
|
||||||
promise.cloner = index.NewPostingsCloner(postings)
|
promise.cloner = index.NewPostingsCloner(postings)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
}, &timeNowMock{}, false)
|
}, &timeNowMock{}, false)
|
||||||
|
|
||||||
_, err := c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, expectedMatchers...)
|
_, err := c.PostingsForMatchers(ctx, indexForPostingsMock{}, true, expectedMatchers...)
|
||||||
require.Equal(t, expectedErr, err)
|
require.ErrorIs(t, err, expectedErr)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("happy case multiple concurrent calls: two same one different", func(t *testing.T) {
|
t.Run("happy case multiple concurrent calls: two same one different", func(t *testing.T) {
|
||||||
|
@ -110,7 +110,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
return nil, fmt.Errorf(matchersString(ms))
|
return nil, fmt.Errorf(matchersString(ms))
|
||||||
}, &timeNowMock{}, forced)
|
}, &timeNowMock{}, forced)
|
||||||
|
|
||||||
results := make([]string, len(calls))
|
results := make([]error, len(calls))
|
||||||
resultsWg := sync.WaitGroup{}
|
resultsWg := sync.WaitGroup{}
|
||||||
resultsWg.Add(len(calls))
|
resultsWg.Add(len(calls))
|
||||||
|
|
||||||
|
@ -118,7 +118,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
for i := 0; i < len(calls); i++ {
|
for i := 0; i < len(calls); i++ {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
_, err := c.PostingsForMatchers(ctx, indexForPostingsMock{}, concurrent, calls[i]...)
|
_, err := c.PostingsForMatchers(ctx, indexForPostingsMock{}, concurrent, calls[i]...)
|
||||||
results[i] = err.Error()
|
results[i] = err
|
||||||
resultsWg.Done()
|
resultsWg.Done()
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
|
@ -136,7 +136,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
|
|
||||||
// check that we got correct results
|
// check that we got correct results
|
||||||
for i, c := range calls {
|
for i, c := range calls {
|
||||||
require.Equal(t, matchersString(c), results[i], "Call %d should have returned error %q, but got %q instead", i, matchersString(c), results[i])
|
require.ErrorContainsf(t, results[i], matchersString(c), "Call %d should have returned error %q, but got %q instead", i, matchersString(c), results[i])
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue