From 94cdfa30cd724842c2971b6b01ed520041ac119e Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Sat, 28 Oct 2023 11:44:20 +0200 Subject: [PATCH] Refactoring Signed-off-by: Danny Kopping --- rules/group.go | 24 +++++++++----- rules/manager.go | 53 ++++++++++++++++++------------- rules/manager_test.go | 73 ++++++++++++++++++++++++++++++++++++++----- 3 files changed, 113 insertions(+), 37 deletions(-) diff --git a/rules/group.go b/rules/group.go index 568d606b58..7f53e1b474 100644 --- a/rules/group.go +++ b/rules/group.go @@ -437,7 +437,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { eval := func(i int, rule Rule, async bool) { defer func() { if async { - g.opts.ConcurrencyController.Done() + g.opts.ConcurrentEvalsController.Done() } }() @@ -569,7 +569,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrencyController.Allow() { + if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalsController != nil && g.opts.ConcurrentEvalsController.Allow() { go eval(i, rule, true) } else { eval(i, rule, false) @@ -888,8 +888,8 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics { } // dependencyMap is a data-structure which contains the relationships between rules within a group. -// It is used to describe the dependency associations between recording rules in a group whereby one rule uses the -// output metric produced by another recording rule in its expression (i.e. as its "input"). +// It is used to describe the dependency associations between rules in a group whereby one rule uses the +// output metric produced by another rule in its expression (i.e. as its "input"). type dependencyMap map[Rule][]Rule // dependents returns the count of rules which use the output of the given rule as one of their inputs. @@ -905,10 +905,6 @@ func (m dependencyMap) dependencies(r Rule) int { var count int for _, children := range m { - if len(children) == 0 { - continue - } - for _, child := range children { if child == r { count++ @@ -919,6 +915,8 @@ func (m dependencyMap) dependencies(r Rule) int { return count } +// isIndependent determines whether the given rule is not dependent on another rule for its input, nor is any other rule +// dependent on its output. func (m dependencyMap) isIndependent(r Rule) bool { if m == nil { return false @@ -928,6 +926,16 @@ func (m dependencyMap) isIndependent(r Rule) bool { } // buildDependencyMap builds a data-structure which contains the relationships between rules within a group. +// +// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose +// output an Alert rule depends will not be able to run concurrently. +// +// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be +// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour: +// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector +// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE +// +// Rules which are independent can run concurrently with no side-effects. func buildDependencyMap(rules []Rule) dependencyMap { dependencies := make(dependencyMap) diff --git a/rules/manager.go b/rules/manager.go index 0aeeae1703..b982f23754 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -104,21 +104,21 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - MaxConcurrentEvals int64 - ConcurrentEvalsEnabled bool - ConcurrencyController ConcurrencyController - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader + MaxConcurrentEvals int64 + ConcurrentEvalsEnabled bool + ConcurrentEvalsController ConcurrentRuleEvalController Metrics *Metrics } @@ -134,7 +134,7 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } - o.ConcurrencyController = NewConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + o.ConcurrentEvalsController = NewConcurrentRuleEvalController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) m := &Manager{ groups: map[string]*Group{}, @@ -410,16 +410,26 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } -type ConcurrencyController struct { +// ConcurrentRuleEvalController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount +// of concurrency in rule evaluations so they do not overwhelm the Prometheus server with additional query load. +// Concurrency is controlled globally, not on a per-group basis. +type ConcurrentRuleEvalController interface { + Allow() bool + Done() +} + +// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. +type concurrentRuleEvalController struct { enabled bool sema *semaphore.Weighted } -func NewConcurrencyController(enabled bool, maxConcurrency int64) ConcurrencyController { - return ConcurrencyController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} +func NewConcurrentRuleEvalController(enabled bool, maxConcurrency int64) ConcurrentRuleEvalController { + return concurrentRuleEvalController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} } -func (c ConcurrencyController) Allow() bool { +// Allow determines whether any concurrency slots are available. +func (c concurrentRuleEvalController) Allow() bool { if !c.enabled { return false } @@ -427,7 +437,8 @@ func (c ConcurrencyController) Allow() bool { return c.sema.TryAcquire(1) } -func (c ConcurrencyController) Done() { +// Done releases a concurrent evaluation slot. +func (c concurrentRuleEvalController) Done() { if !c.enabled { return } diff --git a/rules/manager_test.go b/rules/manager_test.go index 2a9b3a1d73..8b3b9c08ff 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1471,7 +1471,53 @@ func TestNoDependency(t *testing.T) { }) // A group with only one rule cannot have dependencies. - require.False(t, group.dependencyMap.isIndependent(rule)) + require.Empty(t, group.dependencyMap) +} + +func TestDependenciesEdgeCases(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + t.Run("empty group", func(t *testing.T) { + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{}, // empty group + Opts: opts, + }) + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + // A group with no rules has no dependency map, but doesn't panic if the map is queried. + require.Nil(t, group.dependencyMap) + require.False(t, group.dependencyMap.isIndependent(rule)) + }) + + t.Run("rules which reference no series", func(t *testing.T) { + expr, err := parser.ParseExpr("one") + require.NoError(t, err) + rule1 := NewRecordingRule("1", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("two") + require.NoError(t, err) + rule2 := NewRecordingRule("2", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + // A group with rules which reference no series will still produce a dependency map + require.True(t, group.dependencyMap.isIndependent(rule1)) + require.True(t, group.dependencyMap.isIndependent(rule2)) + }) } func TestNoMetricSelector(t *testing.T) { @@ -1596,10 +1642,23 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { require.NoError(t, err) for h, g := range ruleManager.groups { + const ruleName = "job:http_requests:rate5m" + var rr *RecordingRule + + for _, r := range g.rules { + if r.Name() == ruleName { + rr = r.(*RecordingRule) + } + } + + require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName) + // Dependency maps must change because the groups would've been updated. require.NotEqual(t, orig[h], g.dependencyMap) // We expect there to be some dependencies since the new rule group contains a dependency. require.Greater(t, len(g.dependencyMap), 0) + require.Equal(t, 1, g.dependencyMap.dependents(rr)) + require.Zero(t, g.dependencyMap.dependencies(rr)) } } @@ -1625,17 +1684,16 @@ func TestAsyncRuleEvaluation(t *testing.T) { inflightQueries.Add(-1) }() - // Artificially delay all query executions to highly concurrent execution improvement. + // Artificially delay all query executions to highlight concurrent execution improvement. time.Sleep(artificialDelay) - // return a stub sample + // Return a stub sample. return promql.Vector{ promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, }) - // Evaluate groups manually to show the impact of async rule evaluations. groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) require.Empty(t, errs) require.Len(t, groups, 1) @@ -1688,7 +1746,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { for _, group := range groups { // Allow up to 2 concurrent rule evaluations. - group.opts.ConcurrencyController = NewConcurrencyController(true, 2) + group.opts.ConcurrentEvalsController = NewConcurrentRuleEvalController(true, 2) require.Len(t, group.rules, expectedRules) start := time.Now() @@ -1749,17 +1807,16 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { inflightQueries.Add(-1) }() - // Artificially delay all query executions to highly concurrent execution improvement. + // Artificially delay all query executions to highlight concurrent execution improvement. time.Sleep(artificialDelay) - // return a stub sample + // Return a stub sample. return promql.Vector{ promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, }) - // Evaluate groups manually to show the impact of async rule evaluations. groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) require.Empty(t, errs) require.Len(t, groups, groupCount)