diff --git a/rules/group.go b/rules/group.go index 0bc219a11..201d3a67d 100644 --- a/rules/group.go +++ b/rules/group.go @@ -621,14 +621,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } } - // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. - // Try run concurrently if there are slots available. - if ctrl := g.concurrencyController; isRuleEligibleForConcurrentExecution(rule) && ctrl.Allow() { + if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) { wg.Add(1) go eval(i, rule, func() { wg.Done() - ctrl.Done() + ctrl.Done(ctx) }) } else { eval(i, rule, nil) @@ -1094,7 +1092,3 @@ func buildDependencyMap(rules []Rule) dependencyMap { return dependencies } - -func isRuleEligibleForConcurrentExecution(rule Rule) bool { - return rule.NoDependentRules() && rule.NoDependencyRules() -} diff --git a/rules/manager.go b/rules/manager.go index ab33c3c7d..9e5b33fbc 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -457,67 +457,47 @@ func (c ruleDependencyController) AnalyseRules(rules []Rule) { // Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus // server with additional query load. Concurrency is controlled globally, not on a per-group basis. type RuleConcurrencyController interface { - // Allow determines whether any concurrent evaluation slots are available. - // If Allow() returns true, then Done() must be called to release the acquired slot. - Allow() bool + // Allow determines if the given rule is allowed to be evaluated concurrently. + // If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done. + // It is important that both *Group and Rule are not retained and only be used for the duration of the call. + Allow(ctx context.Context, group *Group, rule Rule) bool // Done releases a concurrent evaluation slot. - Done() + Done(ctx context.Context) } // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. type concurrentRuleEvalController struct { - sema *semaphore.Weighted - depMapsMu sync.Mutex - depMaps map[*Group]dependencyMap + sema *semaphore.Weighted } func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController { return &concurrentRuleEvalController{ - sema: semaphore.NewWeighted(maxConcurrency), - depMaps: map[*Group]dependencyMap{}, + sema: semaphore.NewWeighted(maxConcurrency), } } -func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { - c.depMapsMu.Lock() - defer c.depMapsMu.Unlock() - - depMap, found := c.depMaps[g] - if !found { - depMap = buildDependencyMap(g.rules) - c.depMaps[g] = depMap +func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool { + // To allow a rule to be executed concurrently, we need 3 conditions: + // 1. The rule must not have any rules that depend on it. + // 2. The rule itself must not depend on any other rules. + // 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot. + if rule.NoDependentRules() && rule.NoDependencyRules() { + return c.sema.TryAcquire(1) } - return depMap.isIndependent(r) + return false } -func (c *concurrentRuleEvalController) Allow() bool { - return c.sema.TryAcquire(1) -} - -func (c *concurrentRuleEvalController) Done() { +func (c *concurrentRuleEvalController) Done(_ context.Context) { c.sema.Release(1) } -func (c *concurrentRuleEvalController) Invalidate() { - c.depMapsMu.Lock() - defer c.depMapsMu.Unlock() - - // Clear out the memoized dependency maps because some or all groups may have been updated. - c.depMaps = map[*Group]dependencyMap{} -} - // sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially. type sequentialRuleEvalController struct{} -func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool { +func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) bool { return false } -func (c sequentialRuleEvalController) Allow() bool { - return false -} - -func (c sequentialRuleEvalController) Done() {} -func (c sequentialRuleEvalController) Invalidate() {} +func (c sequentialRuleEvalController) Done(_ context.Context) {}