diff --git a/rules/group.go b/rules/group.go index 724b926d4f..4398d9211d 100644 --- a/rules/group.go +++ b/rules/group.go @@ -643,28 +643,46 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { if ctrl == nil { ctrl = sequentialRuleEvalController{} } - for _, batch := range ctrl.SplitGroupIntoBatches(ctx, g) { - for _, ruleIndex := range batch { + + batches := ctrl.SplitGroupIntoBatches(ctx, g) + if len(batches) == 0 { + // Sequential evaluation when batches aren't set. + // This is the behaviour without a defined RuleConcurrencyController + for i, rule := range g.rules { + // Check if the group has been stopped. select { case <-g.done: return default: } - - rule := g.rules[ruleIndex] - if len(batch) > 1 && ctrl.Allow(ctx, g, rule) { - wg.Add(1) - - go eval(ruleIndex, rule, func() { - wg.Done() - ctrl.Done(ctx) - }) - } else { - eval(ruleIndex, rule, nil) - } + eval(i, rule, nil) + } + } else { + // Concurrent evaluation. + for _, batch := range batches { + for _, ruleIndex := range batch { + // Check if the group has been stopped. + select { + case <-g.done: + wg.Wait() + return + default: + } + rule := g.rules[ruleIndex] + if len(batch) > 1 && ctrl.Allow(ctx, g, rule) { + wg.Add(1) + + go eval(ruleIndex, rule, func() { + wg.Done() + ctrl.Done(ctx) + }) + } else { + eval(ruleIndex, rule, nil) + } + } + // It is important that we finish processing any rules in this current batch - before we move into the next one. + wg.Wait() } - // It is important that we finish processing any rules in this current batch - before we move into the next one. - wg.Wait() } g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) diff --git a/rules/manager.go b/rules/manager.go index c4c0f8a1ef..a3ae716e2b 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -550,11 +550,7 @@ func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) } func (c sequentialRuleEvalController) SplitGroupIntoBatches(_ context.Context, g *Group) []ConcurrentRules { - order := make([]ConcurrentRules, len(g.rules)) - for i := range g.rules { - order[i] = []int{i} - } - return order + return nil } func (c sequentialRuleEvalController) Done(_ context.Context) {} diff --git a/rules/manager_test.go b/rules/manager_test.go index 5c3fcc96a8..45da7a44b0 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1989,12 +1989,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Expected evaluation order order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) - require.Equal(t, []ConcurrentRules{ - {0}, - {1}, - {2}, - {3}, - }, order) + require.Nil(t, order) // Never expect more than 1 inflight query at a time. require.EqualValues(t, 1, maxInflight.Load())