Introduced sequentialRuleEvalController

Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
Marco Pracucci 2024-01-26 19:53:44 +01:00
parent 23f89c18b2
commit 046cd7599f
No known key found for this signature in database
GPG key ID: 74C1BD403D2DF9B5
3 changed files with 54 additions and 39 deletions

View file

@ -71,6 +71,9 @@ type Group struct {
// Rule group evaluation iteration function,
// defaults to DefaultEvalIterationFunc.
evalIterationFunc GroupEvalIterationFunc
// concurrencyController controls the rules evaluation concurrency.
concurrencyController RuleConcurrencyController
}
// GroupEvalIterationFunc is used to implement and extend rule group
@ -114,21 +117,27 @@ func NewGroup(o GroupOptions) *Group {
evalIterationFunc = DefaultEvalIterationFunc
}
concurrencyController := o.Opts.RuleConcurrencyController
if concurrencyController == nil {
concurrencyController = sequentialRuleEvalController{}
}
return &Group{
name: o.Name,
file: o.File,
interval: o.Interval,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: o.Opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
terminated: make(chan struct{}),
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
name: o.Name,
file: o.File,
interval: o.Interval,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: o.Opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
terminated: make(chan struct{}),
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
concurrencyController: concurrencyController,
}
}
@ -570,13 +579,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
// If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
// Try run concurrently if there are slots available.
ctrl := g.opts.RuleConcurrencyController
if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() {
if ctrl := g.concurrencyController; ctrl.RuleEligible(g, rule) && ctrl.Allow() {
wg.Add(1)
go eval(i, rule, func() {
wg.Done()
g.opts.RuleConcurrencyController.Done()
ctrl.Done()
})
} else {
eval(i, rule, nil)

View file

@ -135,7 +135,11 @@ func NewManager(o *ManagerOptions) *Manager {
}
if o.RuleConcurrencyController == nil {
o.RuleConcurrencyController = newRuleConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals)
if o.ConcurrentEvalsEnabled {
o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals)
} else {
o.RuleConcurrencyController = sequentialRuleEvalController{}
}
}
m := &Manager{
@ -184,9 +188,7 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
m.mtx.Lock()
defer m.mtx.Unlock()
if m.opts.RuleConcurrencyController != nil {
m.opts.RuleConcurrencyController.Invalidate()
}
m.opts.RuleConcurrencyController.Invalidate()
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...)
@ -436,22 +438,20 @@ type RuleConcurrencyController interface {
Invalidate()
}
func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcurrencyController {
return &concurrentRuleEvalController{
enabled: enabled,
sema: semaphore.NewWeighted(maxConcurrency),
depMaps: map[*Group]dependencyMap{},
}
}
// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
type concurrentRuleEvalController struct {
enabled bool
sema *semaphore.Weighted
depMapsMu sync.Mutex
depMaps map[*Group]dependencyMap
}
func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController {
return &concurrentRuleEvalController{
sema: semaphore.NewWeighted(maxConcurrency),
depMaps: map[*Group]dependencyMap{},
}
}
func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool {
c.depMapsMu.Lock()
defer c.depMapsMu.Unlock()
@ -466,18 +466,10 @@ func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool {
}
func (c *concurrentRuleEvalController) Allow() bool {
if !c.enabled {
return false
}
return c.sema.TryAcquire(1)
}
func (c *concurrentRuleEvalController) Done() {
if !c.enabled {
return
}
c.sema.Release(1)
}
@ -488,3 +480,17 @@ func (c *concurrentRuleEvalController) Invalidate() {
// Clear out the memoized dependency maps because some or all groups may have been updated.
c.depMaps = map[*Group]dependencyMap{}
}
// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially.
type sequentialRuleEvalController struct{}
func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool {
return false
}
func (c sequentialRuleEvalController) Allow() bool {
return false
}
func (c sequentialRuleEvalController) Done() {}
func (c sequentialRuleEvalController) Invalidate() {}

View file

@ -676,7 +676,8 @@ func TestDeletedRuleMarkedStale(t *testing.T) {
rules: []Rule{},
seriesInPreviousEval: []map[string]labels.Labels{},
opts: &ManagerOptions{
Appendable: st,
Appendable: st,
RuleConcurrencyController: sequentialRuleEvalController{},
},
}
newGroup.CopyState(oldGroup)