From 5ee3fbe825e5000acd86a844defda4f842459e99 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 2 Feb 2024 10:06:37 +0100 Subject: [PATCH] Decouple ruler dependency controller from concurrency controller Signed-off-by: Marco Pracucci --- rules/alerting.go | 21 +++++++++++++ rules/alerting_test.go | 42 ++++++++++++++++++++++++++ rules/group.go | 6 +++- rules/manager.go | 43 ++++++++++++++++++--------- rules/manager_test.go | 62 +++++++++++++++++++++++++++++++++++++++ rules/origin.go | 18 +++++++++--- rules/origin_test.go | 65 +++++++++++++++++++++++++++++++++++++++++ rules/recording.go | 21 +++++++++++++ rules/recording_test.go | 22 ++++++++++++++ rules/rule.go | 16 ++++++++++ 10 files changed, 298 insertions(+), 18 deletions(-) diff --git a/rules/alerting.go b/rules/alerting.go index 72a3c7913..a99b2b4aa 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -142,6 +142,9 @@ type AlertingRule struct { active map[uint64]*Alert logger log.Logger + + noDependentRules *atomic.Bool + noDependencyRules *atomic.Bool } // NewAlertingRule constructs a new AlertingRule. @@ -168,6 +171,8 @@ func NewAlertingRule( evaluationTimestamp: atomic.NewTime(time.Time{}), evaluationDuration: atomic.NewDuration(0), lastError: atomic.NewError(nil), + noDependentRules: atomic.NewBool(false), + noDependencyRules: atomic.NewBool(false), } } @@ -317,6 +322,22 @@ func (r *AlertingRule) Restored() bool { return r.restored.Load() } +func (r *AlertingRule) SetNoDependentRules(noDependentRules bool) { + r.noDependentRules.Store(noDependentRules) +} + +func (r *AlertingRule) NoDependentRules() bool { + return r.noDependentRules.Load() +} + +func (r *AlertingRule) SetNoDependencyRules(noDependencyRules bool) { + r.noDependencyRules.Store(noDependencyRules) +} + +func (r *AlertingRule) NoDependencyRules() bool { + return r.noDependencyRules.Load() +} + // resolvedRetention is the duration for which a resolved alert instance // is kept in memory state and consequently repeatedly sent to the AlertManager. const resolvedRetention = 15 * time.Minute diff --git a/rules/alerting_test.go b/rules/alerting_test.go index dd324d1ee..ba39fbf7a 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -920,3 +920,45 @@ func TestAlertingEvalWithOrigin(t *testing.T) { require.NoError(t, err) require.Equal(t, detail, NewRuleDetail(rule)) } + +func TestAlertingRule_SetNoDependentRules(t *testing.T) { + rule := NewAlertingRule( + "test", + &parser.NumberLiteral{Val: 1}, + time.Minute, + 0, + labels.FromStrings("test", "test"), + labels.EmptyLabels(), + labels.EmptyLabels(), + "", + true, log.NewNopLogger(), + ) + require.False(t, rule.NoDependentRules()) + + rule.SetNoDependentRules(false) + require.False(t, rule.NoDependentRules()) + + rule.SetNoDependentRules(true) + require.True(t, rule.NoDependentRules()) +} + +func TestAlertingRule_SetNoDependencyRules(t *testing.T) { + rule := NewAlertingRule( + "test", + &parser.NumberLiteral{Val: 1}, + time.Minute, + 0, + labels.FromStrings("test", "test"), + labels.EmptyLabels(), + labels.EmptyLabels(), + "", + true, log.NewNopLogger(), + ) + require.False(t, rule.NoDependencyRules()) + + rule.SetNoDependencyRules(false) + require.False(t, rule.NoDependencyRules()) + + rule.SetNoDependencyRules(true) + require.True(t, rule.NoDependencyRules()) +} diff --git a/rules/group.go b/rules/group.go index 5ee06dc0b..6f7844bac 100644 --- a/rules/group.go +++ b/rules/group.go @@ -579,7 +579,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if ctrl := g.concurrencyController; ctrl.RuleEligible(g, rule) && ctrl.Allow() { + if ctrl := g.concurrencyController; isRuleEligibleForConcurrentExecution(rule) && ctrl.Allow() { wg.Add(1) go eval(i, rule, func() { @@ -1008,3 +1008,7 @@ func buildDependencyMap(rules []Rule) dependencyMap { return dependencies } + +func isRuleEligibleForConcurrentExecution(rule Rule) bool { + return rule.NoDependentRules() && rule.NoDependencyRules() +} diff --git a/rules/manager.go b/rules/manager.go index 477508dc0..66dcdcf2e 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -119,6 +119,7 @@ type ManagerOptions struct { MaxConcurrentEvals int64 ConcurrentEvalsEnabled bool RuleConcurrencyController RuleConcurrencyController + RuleDependencyController RuleDependencyController Metrics *Metrics } @@ -142,6 +143,10 @@ func NewManager(o *ManagerOptions) *Manager { } } + if o.RuleDependencyController == nil { + o.RuleDependencyController = ruleDependencyController{} + } + m := &Manager{ groups: map[string]*Group{}, opts: o, @@ -188,8 +193,6 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels m.mtx.Lock() defer m.mtx.Unlock() - m.opts.RuleConcurrencyController.Invalidate() - groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) if errs != nil { @@ -322,6 +325,9 @@ func (m *Manager) LoadGroups( )) } + // Check dependencies between rules and store it on the Rule itself. + m.opts.RuleDependencyController.AnalyseRules(rules) + groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ Name: rg.Name, File: fn, @@ -418,24 +424,35 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } -// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose is to bound the amount -// of concurrency in rule evaluations to avoid overwhelming the Prometheus server with additional query load and ensure -// the correctness of rules running concurrently. Concurrency is controlled globally, not on a per-group basis. -type RuleConcurrencyController interface { - // RuleEligible determines if the rule can guarantee correct results while running concurrently. - RuleEligible(g *Group, r Rule) bool +// RuleDependencyController controls whether a set of rules have dependencies between each other. +type RuleDependencyController interface { + // AnalyseRules analyses dependencies between the input rules. For each rule that it's guaranteed + // not having any dependants and/or dependency, this function should call Rule.SetNoDependentRules(true) + // and/or Rule.SetNoDependencyRules(true). + AnalyseRules(rules []Rule) +} +type ruleDependencyController struct{} + +// AnalyseRules implements RuleDependencyController. +func (c ruleDependencyController) AnalyseRules(rules []Rule) { + depMap := buildDependencyMap(rules) + for _, r := range rules { + r.SetNoDependentRules(depMap.dependents(r) == 0) + r.SetNoDependencyRules(depMap.dependencies(r) == 0) + } +} + +// RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently. +// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus +// server with additional query load. Concurrency is controlled globally, not on a per-group basis. +type RuleConcurrencyController interface { // Allow determines whether any concurrent evaluation slots are available. // If Allow() returns true, then Done() must be called to release the acquired slot. Allow() bool // Done releases a concurrent evaluation slot. Done() - - // Invalidate instructs the controller to invalidate its state. - // This should be called when groups are modified (during a reload, for instance), because the controller may - // store some state about each group in order to more efficiently determine rule eligibility. - Invalidate() } // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. diff --git a/rules/manager_test.go b/rules/manager_test.go index 07ec06104..5e3a60922 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1314,6 +1314,8 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) { evaluationTimestamp: atomic.NewTime(time.Time{}), evaluationDuration: atomic.NewDuration(0), lastError: atomic.NewError(nil), + noDependentRules: atomic.NewBool(false), + noDependencyRules: atomic.NewBool(false), } group := NewGroup(GroupOptions{ @@ -1407,6 +1409,66 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { require.Equal(t, chunkenc.ValNone, it.Next()) } +func TestManager_LoadGroups_ShouldCheckWhetherEachRuleHasDependentsAndDependencies(t *testing.T) { + storage := teststorage.New(t) + t.Cleanup(func() { + require.NoError(t, storage.Close()) + }) + + ruleManager := NewManager(&ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + Appendable: storage, + QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { return nil, nil }, + }) + + t.Run("load a mix of dependent and independent rules", func(t *testing.T) { + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + expected := map[string]struct { + noDependentRules bool + noDependencyRules bool + }{ + "job:http_requests:rate1m": { + noDependentRules: true, + noDependencyRules: true, + }, + "job:http_requests:rate5m": { + noDependentRules: true, + noDependencyRules: true, + }, + "job:http_requests:rate15m": { + noDependentRules: true, + noDependencyRules: false, + }, + "TooManyRequests": { + noDependentRules: false, + noDependencyRules: true, + }, + } + + for _, r := range ruleManager.Rules() { + exp, ok := expected[r.Name()] + require.Truef(t, ok, "rule: %s", r.String()) + require.Equalf(t, exp.noDependentRules, r.NoDependentRules(), "rule: %s", r.String()) + require.Equalf(t, exp.noDependencyRules, r.NoDependencyRules(), "rule: %s", r.String()) + } + }) + + t.Run("load only independent rules", func(t *testing.T) { + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, r := range ruleManager.Rules() { + require.Truef(t, r.NoDependentRules(), "rule: %s", r.String()) + require.Truef(t, r.NoDependencyRules(), "rule: %s", r.String()) + } + }) +} + func TestDependencyMap(t *testing.T) { ctx := context.Background() opts := &ManagerOptions{ diff --git a/rules/origin.go b/rules/origin.go index 996538767..695fc5f83 100644 --- a/rules/origin.go +++ b/rules/origin.go @@ -28,6 +28,14 @@ type RuleDetail struct { Query string Labels labels.Labels Kind string + + // NoDependentRules is set to true if it's guaranteed that in the rule group there's no other rule + // which depends on this one. + NoDependentRules bool + + // NoDependencyRules is set to true if it's guaranteed that this rule doesn't depend on any other + // rule within the rule group. + NoDependencyRules bool } const ( @@ -48,10 +56,12 @@ func NewRuleDetail(r Rule) RuleDetail { } return RuleDetail{ - Name: r.Name(), - Query: r.Query().String(), - Labels: r.Labels(), - Kind: kind, + Name: r.Name(), + Query: r.Query().String(), + Labels: r.Labels(), + Kind: kind, + NoDependentRules: r.NoDependentRules(), + NoDependencyRules: r.NoDependencyRules(), } } diff --git a/rules/origin_test.go b/rules/origin_test.go index ea4f4f905..ca466301d 100644 --- a/rules/origin_test.go +++ b/rules/origin_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" @@ -43,9 +44,73 @@ func (u unknownRule) SetEvaluationDuration(time.Duration) {} func (u unknownRule) GetEvaluationDuration() time.Duration { return 0 } func (u unknownRule) SetEvaluationTimestamp(time.Time) {} func (u unknownRule) GetEvaluationTimestamp() time.Time { return time.Time{} } +func (u unknownRule) SetNoDependentRules(bool) {} +func (u unknownRule) NoDependentRules() bool { return false } +func (u unknownRule) SetNoDependencyRules(bool) {} +func (u unknownRule) NoDependencyRules() bool { return false } func TestNewRuleDetailPanics(t *testing.T) { require.PanicsWithValue(t, `unknown rule type "rules.unknownRule"`, func() { NewRuleDetail(unknownRule{}) }) } + +func TestFromOriginContext(t *testing.T) { + t.Run("should return zero value if RuleDetail is missing in the context", func(t *testing.T) { + detail := FromOriginContext(context.Background()) + require.Zero(t, detail) + + // The zero value for NoDependentRules must be the most conservative option. + require.False(t, detail.NoDependentRules) + + // The zero value for NoDependencyRules must be the most conservative option. + require.False(t, detail.NoDependencyRules) + }) +} + +func TestNewRuleDetail(t *testing.T) { + t.Run("should populate NoDependentRules and NoDependencyRules for a RecordingRule", func(t *testing.T) { + rule := NewRecordingRule("test", &parser.NumberLiteral{Val: 1}, labels.EmptyLabels()) + detail := NewRuleDetail(rule) + require.False(t, detail.NoDependentRules) + require.False(t, detail.NoDependencyRules) + + rule.SetNoDependentRules(true) + detail = NewRuleDetail(rule) + require.True(t, detail.NoDependentRules) + require.False(t, detail.NoDependencyRules) + + rule.SetNoDependencyRules(true) + detail = NewRuleDetail(rule) + require.True(t, detail.NoDependentRules) + require.True(t, detail.NoDependencyRules) + }) + + t.Run("should populate NoDependentRules and NoDependencyRules for a AlertingRule", func(t *testing.T) { + rule := NewAlertingRule( + "test", + &parser.NumberLiteral{Val: 1}, + time.Minute, + 0, + labels.FromStrings("test", "test"), + labels.EmptyLabels(), + labels.EmptyLabels(), + "", + true, log.NewNopLogger(), + ) + + detail := NewRuleDetail(rule) + require.False(t, detail.NoDependentRules) + require.False(t, detail.NoDependencyRules) + + rule.SetNoDependentRules(true) + detail = NewRuleDetail(rule) + require.True(t, detail.NoDependentRules) + require.False(t, detail.NoDependencyRules) + + rule.SetNoDependencyRules(true) + detail = NewRuleDetail(rule) + require.True(t, detail.NoDependentRules) + require.True(t, detail.NoDependencyRules) + }) +} diff --git a/rules/recording.go b/rules/recording.go index b6a886cdd..e2b0a31a0 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -41,6 +41,9 @@ type RecordingRule struct { lastError *atomic.Error // Duration of how long it took to evaluate the recording rule. evaluationDuration *atomic.Duration + + noDependentRules *atomic.Bool + noDependencyRules *atomic.Bool } // NewRecordingRule returns a new recording rule. @@ -53,6 +56,8 @@ func NewRecordingRule(name string, vector parser.Expr, lset labels.Labels) *Reco evaluationTimestamp: atomic.NewTime(time.Time{}), evaluationDuration: atomic.NewDuration(0), lastError: atomic.NewError(nil), + noDependentRules: atomic.NewBool(false), + noDependencyRules: atomic.NewBool(false), } } @@ -166,3 +171,19 @@ func (rule *RecordingRule) SetEvaluationTimestamp(ts time.Time) { func (rule *RecordingRule) GetEvaluationTimestamp() time.Time { return rule.evaluationTimestamp.Load() } + +func (rule *RecordingRule) SetNoDependentRules(noDependentRules bool) { + rule.noDependentRules.Store(noDependentRules) +} + +func (rule *RecordingRule) NoDependentRules() bool { + return rule.noDependentRules.Load() +} + +func (rule *RecordingRule) SetNoDependencyRules(noDependencyRules bool) { + rule.noDependencyRules.Store(noDependencyRules) +} + +func (rule *RecordingRule) NoDependencyRules() bool { + return rule.noDependencyRules.Load() +} diff --git a/rules/recording_test.go b/rules/recording_test.go index 960ff4bdb..7a09cd6d8 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -249,3 +249,25 @@ func TestRecordingEvalWithOrigin(t *testing.T) { require.NoError(t, err) require.Equal(t, detail, NewRuleDetail(rule)) } + +func TestRecordingRule_SetNoDependentRules(t *testing.T) { + rule := NewRecordingRule("1", &parser.NumberLiteral{Val: 1}, labels.EmptyLabels()) + require.False(t, rule.NoDependentRules()) + + rule.SetNoDependentRules(false) + require.False(t, rule.NoDependentRules()) + + rule.SetNoDependentRules(true) + require.True(t, rule.NoDependentRules()) +} + +func TestRecordingRule_SetNoDependencyRules(t *testing.T) { + rule := NewRecordingRule("1", &parser.NumberLiteral{Val: 1}, labels.EmptyLabels()) + require.False(t, rule.NoDependencyRules()) + + rule.SetNoDependencyRules(false) + require.False(t, rule.NoDependencyRules()) + + rule.SetNoDependencyRules(true) + require.True(t, rule.NoDependencyRules()) +} diff --git a/rules/rule.go b/rules/rule.go index a4a8c0445..59af3e0bb 100644 --- a/rules/rule.go +++ b/rules/rule.go @@ -61,4 +61,20 @@ type Rule interface { // GetEvaluationTimestamp returns last evaluation timestamp. // NOTE: Used dynamically by rules.html template. GetEvaluationTimestamp() time.Time + + // SetNoDependentRules sets whether there's no other rule in the rule group that depends on this rule. + SetNoDependentRules(bool) + + // NoDependentRules returns true if it's guaranteed that in the rule group there's no other rule + // which depends on this one. In case this function returns false there's no such guarantee, which + // means there may or may not be other rules depending on this one. + NoDependentRules() bool + + // SetNoDependencyRules sets whether this rule doesn't depend on the output of any rule in the rule group. + SetNoDependencyRules(bool) + + // NoDependencyRules returns true if it's guaranteed that this rule doesn't depend on the output of + // any other rule in the group. In case this function returns false there's no such guarantee, which + // means the rule may or may not depend on other rules. + NoDependencyRules() bool }