From 046cd7599f573ff0f342edbafeee8f539a8077a2 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jan 2024 19:53:44 +0100 Subject: [PATCH] Introduced sequentialRuleEvalController Signed-off-by: Marco Pracucci --- rules/group.go | 42 ++++++++++++++++++++++--------------- rules/manager.go | 48 ++++++++++++++++++++++++------------------- rules/manager_test.go | 3 ++- 3 files changed, 54 insertions(+), 39 deletions(-) diff --git a/rules/group.go b/rules/group.go index b50189fa9..56648a60c 100644 --- a/rules/group.go +++ b/rules/group.go @@ -71,6 +71,9 @@ type Group struct { // Rule group evaluation iteration function, // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc + + // concurrencyController controls the rules evaluation concurrency. + concurrencyController RuleConcurrencyController } // GroupEvalIterationFunc is used to implement and extend rule group @@ -114,21 +117,27 @@ func NewGroup(o GroupOptions) *Group { evalIterationFunc = DefaultEvalIterationFunc } + concurrencyController := o.Opts.RuleConcurrencyController + if concurrencyController == nil { + concurrencyController = sequentialRuleEvalController{} + } + return &Group{ - name: o.Name, - file: o.File, - interval: o.Interval, - limit: o.Limit, - rules: o.Rules, - shouldRestore: o.ShouldRestore, - opts: o.Opts, - seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), - done: make(chan struct{}), - managerDone: o.done, - terminated: make(chan struct{}), - logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), - metrics: metrics, - evalIterationFunc: evalIterationFunc, + name: o.Name, + file: o.File, + interval: o.Interval, + limit: o.Limit, + rules: o.Rules, + shouldRestore: o.ShouldRestore, + opts: o.Opts, + seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), + done: make(chan struct{}), + managerDone: o.done, + terminated: make(chan struct{}), + logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), + metrics: metrics, + evalIterationFunc: evalIterationFunc, + concurrencyController: concurrencyController, } } @@ -570,13 +579,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - ctrl := g.opts.RuleConcurrencyController - if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() { + if ctrl := g.concurrencyController; ctrl.RuleEligible(g, rule) && ctrl.Allow() { wg.Add(1) go eval(i, rule, func() { wg.Done() - g.opts.RuleConcurrencyController.Done() + ctrl.Done() }) } else { eval(i, rule, nil) diff --git a/rules/manager.go b/rules/manager.go index 7da4eb88b..477508dc0 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -135,7 +135,11 @@ func NewManager(o *ManagerOptions) *Manager { } if o.RuleConcurrencyController == nil { - o.RuleConcurrencyController = newRuleConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + if o.ConcurrentEvalsEnabled { + o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals) + } else { + o.RuleConcurrencyController = sequentialRuleEvalController{} + } } m := &Manager{ @@ -184,9 +188,7 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels m.mtx.Lock() defer m.mtx.Unlock() - if m.opts.RuleConcurrencyController != nil { - m.opts.RuleConcurrencyController.Invalidate() - } + m.opts.RuleConcurrencyController.Invalidate() groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) @@ -436,22 +438,20 @@ type RuleConcurrencyController interface { Invalidate() } -func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcurrencyController { - return &concurrentRuleEvalController{ - enabled: enabled, - sema: semaphore.NewWeighted(maxConcurrency), - depMaps: map[*Group]dependencyMap{}, - } -} - // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. type concurrentRuleEvalController struct { - enabled bool sema *semaphore.Weighted depMapsMu sync.Mutex depMaps map[*Group]dependencyMap } +func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController { + return &concurrentRuleEvalController{ + sema: semaphore.NewWeighted(maxConcurrency), + depMaps: map[*Group]dependencyMap{}, + } +} + func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { c.depMapsMu.Lock() defer c.depMapsMu.Unlock() @@ -466,18 +466,10 @@ func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { } func (c *concurrentRuleEvalController) Allow() bool { - if !c.enabled { - return false - } - return c.sema.TryAcquire(1) } func (c *concurrentRuleEvalController) Done() { - if !c.enabled { - return - } - c.sema.Release(1) } @@ -488,3 +480,17 @@ func (c *concurrentRuleEvalController) Invalidate() { // Clear out the memoized dependency maps because some or all groups may have been updated. c.depMaps = map[*Group]dependencyMap{} } + +// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially. +type sequentialRuleEvalController struct{} + +func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool { + return false +} + +func (c sequentialRuleEvalController) Allow() bool { + return false +} + +func (c sequentialRuleEvalController) Done() {} +func (c sequentialRuleEvalController) Invalidate() {} diff --git a/rules/manager_test.go b/rules/manager_test.go index ed6fea253..7d5a2bd9f 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -676,7 +676,8 @@ func TestDeletedRuleMarkedStale(t *testing.T) { rules: []Rule{}, seriesInPreviousEval: []map[string]labels.Labels{}, opts: &ManagerOptions{ - Appendable: st, + Appendable: st, + RuleConcurrencyController: sequentialRuleEvalController{}, }, } newGroup.CopyState(oldGroup)