From 7aa3b10c3fb6ae025e0173e439f54f6020a7eec8 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 5 Jan 2024 22:48:30 +0200 Subject: [PATCH] Block until all rules, both sync & async, have completed evaluating Updated & added tests Review feedback nits Return empty map if not indeterminate Use highWatermark to track inflight requests counter Appease the linter Clarify feature flag Signed-off-by: Danny Kopping --- docs/feature_flags.md | 9 +- .../fixtures/rules_multiple_independent.yaml | 15 + rules/group.go | 25 +- rules/manager.go | 17 +- rules/manager_test.go | 287 ++++++++++-------- 5 files changed, 208 insertions(+), 145 deletions(-) create mode 100644 rules/fixtures/rules_multiple_independent.yaml diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 5517018df..95b627010 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -217,8 +217,9 @@ Besides enabling this feature in Prometheus, created timestamps need to be expos `--enable-feature=concurrent-rule-eval` -Rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the +By default, rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the output of a preceding rule as its input. However, if there is no detectable relationship between rules then there is no -reason to run them sequentially. This can improve rule reliability at the expense of adding more concurrent query -load. The number of concurrent rule evaluations can be configured with `--rules.max-concurrent-rule-evals` which is set -to `4` by default. +reason to run them sequentially. +When the `concurrent-rule-eval` feature flag is enabled, rules without any dependency on other rules within a rule group will be evaluated concurrently. +This can improve rule reliability at the expense of adding more concurrent query load. The number of concurrent rule evaluations can be configured +with `--rules.max-concurrent-rule-evals` which is set to `4` by default. diff --git a/rules/fixtures/rules_multiple_independent.yaml b/rules/fixtures/rules_multiple_independent.yaml new file mode 100644 index 000000000..e071be3ef --- /dev/null +++ b/rules/fixtures/rules_multiple_independent.yaml @@ -0,0 +1,15 @@ +groups: + - name: independents + rules: + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: job:http_requests:rate30m + expr: sum by (job)(rate(http_requests_total[30m])) + - record: job:http_requests:rate1h + expr: sum by (job)(rate(http_requests_total[1h])) + - record: job:http_requests:rate2h + expr: sum by (job)(rate(http_requests_total[2h])) diff --git a/rules/group.go b/rules/group.go index 8de0900d1..939d2cd5b 100644 --- a/rules/group.go +++ b/rules/group.go @@ -423,8 +423,13 @@ func (g *Group) CopyState(from *Group) { } // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. +// Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled. func (g *Group) Eval(ctx context.Context, ts time.Time) { - var samplesTotal atomic.Float64 + var ( + samplesTotal atomic.Float64 + wg sync.WaitGroup + ) + for i, rule := range g.rules { select { case <-g.done: @@ -435,6 +440,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { eval := func(i int, rule Rule, async bool) { defer func() { if async { + wg.Done() g.opts.RuleConcurrencyController.Done() } }() @@ -569,12 +575,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // Try run concurrently if there are slots available. ctrl := g.opts.RuleConcurrencyController if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() { + wg.Add(1) go eval(i, rule, true) } else { eval(i, rule, false) } } + wg.Wait() if g.metrics != nil { g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) } @@ -940,7 +948,7 @@ func buildDependencyMap(rules []Rule) dependencyMap { if len(rules) <= 1 { // No relationships if group has 1 or fewer rules. - return nil + return dependencies } inputs := make(map[string][]Rule, len(rules)) @@ -949,7 +957,9 @@ func buildDependencyMap(rules []Rule) dependencyMap { var indeterminate bool for _, rule := range rules { - rule := rule + if indeterminate { + break + } name := rule.Name() outputs[name] = append(outputs[name], rule) @@ -980,15 +990,10 @@ func buildDependencyMap(rules []Rule) dependencyMap { return nil } - if len(inputs) == 0 || len(outputs) == 0 { - // No relationships can be inferred. - return nil - } - for output, outRules := range outputs { for _, outRule := range outRules { - if rs, found := inputs[output]; found && len(rs) > 0 { - dependencies[outRule] = append(dependencies[outRule], rs...) + if inRules, found := inputs[output]; found && len(inRules) > 0 { + dependencies[outRule] = append(dependencies[outRule], inRules...) } } } diff --git a/rules/manager.go b/rules/manager.go index 9ac95cdbd..84b43fba7 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -424,6 +424,7 @@ type RuleConcurrencyController interface { RuleEligible(g *Group, r Rule) bool // Allow determines whether any concurrent evaluation slots are available. + // If Allow() returns true, then Done() must be called to release the acquired slot. Allow() bool // Done releases a concurrent evaluation slot. @@ -445,15 +446,15 @@ func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcur // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. type concurrentRuleEvalController struct { - mu sync.Mutex - enabled bool - sema *semaphore.Weighted - depMaps map[*Group]dependencyMap + enabled bool + sema *semaphore.Weighted + depMapsMu sync.Mutex + depMaps map[*Group]dependencyMap } func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { - c.mu.Lock() - defer c.mu.Unlock() + c.depMapsMu.Lock() + defer c.depMapsMu.Unlock() depMap, found := c.depMaps[g] if !found { @@ -481,8 +482,8 @@ func (c *concurrentRuleEvalController) Done() { } func (c *concurrentRuleEvalController) Invalidate() { - c.mu.Lock() - defer c.mu.Unlock() + c.depMapsMu.Lock() + defer c.depMapsMu.Unlock() // Clear out the memoized dependency maps because some or all groups may have been updated. c.depMaps = map[*Group]dependencyMap{} diff --git a/rules/manager_test.go b/rules/manager_test.go index 47f0248eb..2d1dc6b42 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1498,8 +1498,8 @@ func TestDependenciesEdgeCases(t *testing.T) { depMap := buildDependencyMap(group.rules) // A group with no rules has no dependency map, but doesn't panic if the map is queried. - require.Nil(t, depMap) - require.False(t, depMap.isIndependent(rule)) + require.Empty(t, depMap) + require.True(t, depMap.isIndependent(rule)) }) t.Run("rules which reference no series", func(t *testing.T) { @@ -1627,7 +1627,7 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { err := ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) require.NoError(t, err) - require.Greater(t, len(ruleManager.groups), 0, "expected non-empty rule groups") + require.NotEmpty(t, ruleManager.groups, "expected non-empty rule groups") orig := make(map[string]dependencyMap, len(ruleManager.groups)) for _, g := range ruleManager.groups { @@ -1643,7 +1643,13 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { for h, g := range ruleManager.groups { depMap := buildDependencyMap(g.rules) // Dependency maps are the same because of no updates. - require.Equal(t, orig[h], depMap) + if orig[h] == nil { + require.Empty(t, orig[h]) + require.Empty(t, depMap) + } else { + require.Equal(t, orig[h], depMap) + } + } // Groups will be recreated when updated. @@ -1667,7 +1673,7 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { // Dependency maps must change because the groups would've been updated. require.NotEqual(t, orig[h], depMap) // We expect there to be some dependencies since the new rule group contains a dependency. - require.Greater(t, len(depMap), 0) + require.NotEmpty(t, depMap) require.Equal(t, 1, depMap.dependents(rr)) require.Zero(t, depMap.dependencies(rr)) } @@ -1677,86 +1683,51 @@ func TestAsyncRuleEvaluation(t *testing.T) { storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) - const artificialDelay = time.Second - var ( inflightQueries atomic.Int32 maxInflight atomic.Int32 ) - files := []string{"fixtures/rules_multiple.yaml"} - opts := &ManagerOptions{ - Context: context.Background(), - Logger: log.NewNopLogger(), - Appendable: storage, - QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { - inflightQueries.Add(1) - defer func() { - inflightQueries.Add(-1) - }() - - // Artificially delay all query executions to highlight concurrent execution improvement. - time.Sleep(artificialDelay) - - // Return a stub sample. - return promql.Vector{ - promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, - }, nil - }, - } - - inflightTracker := func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - highWatermark := maxInflight.Load() - current := inflightQueries.Load() - if current > highWatermark { - maxInflight.Store(current) - } - - time.Sleep(time.Millisecond) - } - } - } - - expectedRules := 4 - t.Run("synchronous evaluation with independent rules", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - - ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) - require.Empty(t, errs) - require.Len(t, groups, 1) - - for _, group := range groups { - require.Len(t, group.rules, expectedRules) - - start := time.Now() - - // Never expect more than 1 inflight query at a time. - go inflightTracker(ctx) - - group.Eval(ctx, start) - - require.EqualValues(t, 1, maxInflight.Load()) - // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples)) - } - - cancel() - }) - - t.Run("asynchronous evaluation with independent rules", func(t *testing.T) { // Reset. inflightQueries.Store(0) maxInflight.Store(0) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + ruleCount := 4 + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + group.Eval(ctx, start) + + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) + + t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 4 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) // Configure concurrency settings. opts.ConcurrentEvalsEnabled = true @@ -1764,28 +1735,97 @@ func TestAsyncRuleEvaluation(t *testing.T) { opts.RuleConcurrencyController = nil ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...) require.Empty(t, errs) require.Len(t, groups, 1) for _, group := range groups { - require.Len(t, group.rules, expectedRules) + require.Len(t, group.rules, ruleCount) start := time.Now() - - go inflightTracker(ctx) - group.Eval(ctx, start) // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. - require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples)) + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) } + }) - cancel() + t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 6 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + group.Eval(ctx, start) + + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + + } + }) + + t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 6 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + + group.Eval(ctx, start) + + // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. + require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } }) } @@ -1793,8 +1833,6 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) - const artificialDelay = time.Millisecond * 100 - var ( inflightQueries atomic.Int32 maxInflight atomic.Int32 @@ -1803,50 +1841,15 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { ) files := []string{"fixtures/rules_multiple_groups.yaml"} - ruleManager := NewManager(&ManagerOptions{ - Context: context.Background(), - Logger: log.NewNopLogger(), - Appendable: storage, - ConcurrentEvalsEnabled: true, - MaxConcurrentEvals: maxConcurrency, - QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { - inflightQueries.Add(1) - defer func() { - inflightQueries.Add(-1) - }() - // Artificially delay all query executions to highlight concurrent execution improvement. - time.Sleep(artificialDelay) - - // Return a stub sample. - return promql.Vector{ - promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, - }, nil - }, - }) + ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, maxConcurrency)) groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) require.Empty(t, errs) require.Len(t, groups, groupCount) ctx, cancel := context.WithCancel(context.Background()) - - go func() { - for { - select { - case <-ctx.Done(): - return - default: - highWatermark := maxInflight.Load() - current := inflightQueries.Load() - if current > highWatermark { - maxInflight.Store(current) - } - - time.Sleep(time.Millisecond) - } - } - }() + t.Cleanup(cancel) // Evaluate groups concurrently (like they normally do). var wg sync.WaitGroup @@ -1861,8 +1864,46 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { } wg.Wait() - cancel() // Synchronous queries also count towards inflight, so at most we can have maxConcurrency+$groupCount inflight evaluations. require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount)) } + +const artificialDelay = 10 * time.Millisecond + +func optsFactory(storage storage.Storage, maxInflight, inflightQueries *atomic.Int32, maxConcurrent int64) *ManagerOptions { + var inflightMu sync.Mutex + + concurrent := maxConcurrent > 0 + + return &ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + ConcurrentEvalsEnabled: concurrent, + MaxConcurrentEvals: maxConcurrent, + Appendable: storage, + QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { + inflightMu.Lock() + + current := inflightQueries.Add(1) + defer func() { + inflightQueries.Add(-1) + }() + + highWatermark := maxInflight.Load() + + if current > highWatermark { + maxInflight.Store(current) + } + inflightMu.Unlock() + + // Artificially delay all query executions to highlight concurrent execution improvement. + time.Sleep(artificialDelay) + + // Return a stub sample. + return promql.Vector{ + promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, + }, nil + }, + } +}