diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index beb9e3b0ad..8dd1d88fa0 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -757,18 +757,19 @@ func main() { queryEngine = promql.NewEngine(opts) ruleManager = rules.NewManager(&rules.ManagerOptions{ - Appendable: fanoutStorage, - Queryable: localStorage, - QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), - NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), - Context: ctxRule, - ExternalURL: cfg.web.ExternalURL, - Registerer: prometheus.DefaultRegisterer, - Logger: log.With(logger, "component", "rule manager"), - OutageTolerance: time.Duration(cfg.outageTolerance), - ForGracePeriod: time.Duration(cfg.forGracePeriod), - ResendDelay: time.Duration(cfg.resendDelay), - MaxConcurrentEvals: cfg.maxConcurrentEvals, + Appendable: fanoutStorage, + Queryable: localStorage, + QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), + NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), + Context: ctxRule, + ExternalURL: cfg.web.ExternalURL, + Registerer: prometheus.DefaultRegisterer, + Logger: log.With(logger, "component", "rule manager"), + OutageTolerance: time.Duration(cfg.outageTolerance), + ForGracePeriod: time.Duration(cfg.forGracePeriod), + ResendDelay: time.Duration(cfg.resendDelay), + MaxConcurrentEvals: cfg.maxConcurrentEvals, + ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval, }) } diff --git a/rules/group.go b/rules/group.go index c742820a81..2be41c8015 100644 --- a/rules/group.go +++ b/rules/group.go @@ -435,11 +435,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } eval := func(i int, rule Rule, async bool) { - if async { - defer func() { - g.opts.ConcurrentEvalSema.Release(1) - }() - } + defer func() { + if async { + g.opts.ConcurrencyController.Done() + } + }() + logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) ctx, sp := otel.Tracer("").Start(ctx, "rule") sp.SetAttributes(attribute.String("name", rule.Name())) @@ -568,7 +569,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalSema != nil && g.opts.ConcurrentEvalSema.TryAcquire(1) { + if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrencyController.Allow() { go eval(i, rule, true) } else { eval(i, rule, false) diff --git a/rules/manager.go b/rules/manager.go index e9fa94e9e2..0aeeae1703 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -104,20 +104,21 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - MaxConcurrentEvals int64 - ConcurrentEvalSema *semaphore.Weighted - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + MaxConcurrentEvals int64 + ConcurrentEvalsEnabled bool + ConcurrencyController ConcurrencyController + GroupLoader GroupLoader Metrics *Metrics } @@ -133,7 +134,7 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } - o.ConcurrentEvalSema = semaphore.NewWeighted(o.MaxConcurrentEvals) + o.ConcurrencyController = NewConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) m := &Manager{ groups: map[string]*Group{}, @@ -408,3 +409,28 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } } + +type ConcurrencyController struct { + enabled bool + sema *semaphore.Weighted +} + +func NewConcurrencyController(enabled bool, maxConcurrency int64) ConcurrencyController { + return ConcurrencyController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} +} + +func (c ConcurrencyController) Allow() bool { + if !c.enabled { + return false + } + + return c.sema.TryAcquire(1) +} + +func (c ConcurrencyController) Done() { + if !c.enabled { + return + } + + c.sema.Release(1) +} diff --git a/rules/manager_test.go b/rules/manager_test.go index e3e156038e..c2b23716f1 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" - "golang.org/x/sync/semaphore" "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/model/labels" @@ -1672,7 +1671,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { for _, group := range groups { // Allow up to 2 concurrent rule evaluations. - group.opts.ConcurrentEvalSema = semaphore.NewWeighted(2) + group.opts.ConcurrencyController = NewConcurrencyController(true, 2) require.Len(t, group.rules, expectedRules) start := time.Now() @@ -1722,10 +1721,11 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { files := []string{"fixtures/rules_multiple_groups.yaml"} ruleManager := NewManager(&ManagerOptions{ - Context: context.Background(), - Logger: log.NewNopLogger(), - Appendable: storage, - MaxConcurrentEvals: maxConcurrency, + Context: context.Background(), + Logger: log.NewNopLogger(), + Appendable: storage, + ConcurrentEvalsEnabled: true, + MaxConcurrentEvals: maxConcurrency, QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { inflightQueries.Add(1) defer func() {