Refactor concurrency control

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
This commit is contained in:
Danny Kopping 2023-10-25 23:05:25 +02:00 committed by Marco Pracucci
parent ed2933ca60
commit e7758d187e
No known key found for this signature in database
GPG key ID: 74C1BD403D2DF9B5
4 changed files with 67 additions and 39 deletions

View file

@ -757,18 +757,19 @@ func main() {
queryEngine = promql.NewEngine(opts)
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
MaxConcurrentEvals: cfg.maxConcurrentEvals,
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
MaxConcurrentEvals: cfg.maxConcurrentEvals,
ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval,
})
}

View file

@ -435,11 +435,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
eval := func(i int, rule Rule, async bool) {
if async {
defer func() {
g.opts.ConcurrentEvalSema.Release(1)
}()
}
defer func() {
if async {
g.opts.ConcurrencyController.Done()
}
}()
logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i)
ctx, sp := otel.Tracer("").Start(ctx, "rule")
sp.SetAttributes(attribute.String("name", rule.Name()))
@ -568,7 +569,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
// If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
// Try run concurrently if there are slots available.
if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalSema != nil && g.opts.ConcurrentEvalSema.TryAcquire(1) {
if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrencyController.Allow() {
go eval(i, rule, true)
} else {
eval(i, rule, false)

View file

@ -104,20 +104,21 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
ExternalURL *url.URL
QueryFunc QueryFunc
NotifyFunc NotifyFunc
Context context.Context
Appendable storage.Appendable
Queryable storage.Queryable
Logger log.Logger
Registerer prometheus.Registerer
OutageTolerance time.Duration
ForGracePeriod time.Duration
ResendDelay time.Duration
MaxConcurrentEvals int64
ConcurrentEvalSema *semaphore.Weighted
GroupLoader GroupLoader
ExternalURL *url.URL
QueryFunc QueryFunc
NotifyFunc NotifyFunc
Context context.Context
Appendable storage.Appendable
Queryable storage.Queryable
Logger log.Logger
Registerer prometheus.Registerer
OutageTolerance time.Duration
ForGracePeriod time.Duration
ResendDelay time.Duration
MaxConcurrentEvals int64
ConcurrentEvalsEnabled bool
ConcurrencyController ConcurrencyController
GroupLoader GroupLoader
Metrics *Metrics
}
@ -133,7 +134,7 @@ func NewManager(o *ManagerOptions) *Manager {
o.GroupLoader = FileLoader{}
}
o.ConcurrentEvalSema = semaphore.NewWeighted(o.MaxConcurrentEvals)
o.ConcurrencyController = NewConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals)
m := &Manager{
groups: map[string]*Group{},
@ -408,3 +409,28 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc {
}
}
}
type ConcurrencyController struct {
enabled bool
sema *semaphore.Weighted
}
func NewConcurrencyController(enabled bool, maxConcurrency int64) ConcurrencyController {
return ConcurrencyController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)}
}
func (c ConcurrencyController) Allow() bool {
if !c.enabled {
return false
}
return c.sema.TryAcquire(1)
}
func (c ConcurrencyController) Done() {
if !c.enabled {
return
}
c.sema.Release(1)
}

View file

@ -30,7 +30,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
"golang.org/x/sync/semaphore"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/model/labels"
@ -1672,7 +1671,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
for _, group := range groups {
// Allow up to 2 concurrent rule evaluations.
group.opts.ConcurrentEvalSema = semaphore.NewWeighted(2)
group.opts.ConcurrencyController = NewConcurrencyController(true, 2)
require.Len(t, group.rules, expectedRules)
start := time.Now()
@ -1722,10 +1721,11 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) {
files := []string{"fixtures/rules_multiple_groups.yaml"}
ruleManager := NewManager(&ManagerOptions{
Context: context.Background(),
Logger: log.NewNopLogger(),
Appendable: storage,
MaxConcurrentEvals: maxConcurrency,
Context: context.Background(),
Logger: log.NewNopLogger(),
Appendable: storage,
ConcurrentEvalsEnabled: true,
MaxConcurrentEvals: maxConcurrency,
QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
inflightQueries.Add(1)
defer func() {