Refactoring for performance, and to allow controller to be overridden

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
This commit is contained in:
Danny Kopping 2023-11-02 20:33:06 +02:00 committed by Marco Pracucci
parent 5bda33375a
commit f922534c4d
No known key found for this signature in database
GPG key ID: 74C1BD403D2DF9B5
3 changed files with 134 additions and 85 deletions

View file

@ -71,7 +71,6 @@ type Group struct {
// Rule group evaluation iteration function, // Rule group evaluation iteration function,
// defaults to DefaultEvalIterationFunc. // defaults to DefaultEvalIterationFunc.
evalIterationFunc GroupEvalIterationFunc evalIterationFunc GroupEvalIterationFunc
dependencyMap dependencyMap
} }
// GroupEvalIterationFunc is used to implement and extend rule group // GroupEvalIterationFunc is used to implement and extend rule group
@ -130,7 +129,6 @@ func NewGroup(o GroupOptions) *Group {
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
metrics: metrics, metrics: metrics,
evalIterationFunc: evalIterationFunc, evalIterationFunc: evalIterationFunc,
dependencyMap: buildDependencyMap(o.Rules),
} }
} }
@ -437,7 +435,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
eval := func(i int, rule Rule, async bool) { eval := func(i int, rule Rule, async bool) {
defer func() { defer func() {
if async { if async {
g.opts.ConcurrentEvalsController.Done() g.opts.RuleConcurrencyController.Done()
} }
}() }()
@ -569,7 +567,8 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
// If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
// Try run concurrently if there are slots available. // Try run concurrently if there are slots available.
if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalsController != nil && g.opts.ConcurrentEvalsController.Allow() { ctrl := g.opts.RuleConcurrencyController
if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() {
go eval(i, rule, true) go eval(i, rule, true)
} else { } else {
eval(i, rule, false) eval(i, rule, false)

View file

@ -118,7 +118,7 @@ type ManagerOptions struct {
GroupLoader GroupLoader GroupLoader GroupLoader
MaxConcurrentEvals int64 MaxConcurrentEvals int64
ConcurrentEvalsEnabled bool ConcurrentEvalsEnabled bool
ConcurrentEvalsController ConcurrentRuleEvalController RuleConcurrencyController RuleConcurrencyController
Metrics *Metrics Metrics *Metrics
} }
@ -134,7 +134,9 @@ func NewManager(o *ManagerOptions) *Manager {
o.GroupLoader = FileLoader{} o.GroupLoader = FileLoader{}
} }
o.ConcurrentEvalsController = NewConcurrentRuleEvalController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) if o.RuleConcurrencyController == nil {
o.RuleConcurrencyController = newRuleConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals)
}
m := &Manager{ m := &Manager{
groups: map[string]*Group{}, groups: map[string]*Group{},
@ -182,6 +184,10 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
if m.opts.RuleConcurrencyController != nil {
m.opts.RuleConcurrencyController.Invalidate()
}
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...)
if errs != nil { if errs != nil {
@ -410,26 +416,55 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc {
} }
} }
// ConcurrentRuleEvalController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount // RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount
// of concurrency in rule evaluations so they do not overwhelm the Prometheus server with additional query load. // of concurrency in rule evaluations, to not overwhelm the Prometheus server with additional query load.
// Concurrency is controlled globally, not on a per-group basis. // Concurrency is controlled globally, not on a per-group basis.
type ConcurrentRuleEvalController interface { type RuleConcurrencyController interface {
// RuleEligible determines if a rule can be run concurrently.
RuleEligible(g *Group, r Rule) bool
// Allow determines whether any concurrent evaluation slots are available.
Allow() bool Allow() bool
// Done releases a concurrent evaluation slot.
Done() Done()
// Invalidate instructs the controller to invalidate its state.
// This should be called when groups are modified (during a reload, for instance), because the controller may
// store some state about each group in order to more efficiently determine rule eligibility.
Invalidate()
}
func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcurrencyController {
return &concurrentRuleEvalController{
enabled: enabled,
sema: semaphore.NewWeighted(maxConcurrency),
depMaps: map[*Group]dependencyMap{},
}
} }
// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
type concurrentRuleEvalController struct { type concurrentRuleEvalController struct {
mu sync.Mutex
enabled bool enabled bool
sema *semaphore.Weighted sema *semaphore.Weighted
depMaps map[*Group]dependencyMap
} }
func NewConcurrentRuleEvalController(enabled bool, maxConcurrency int64) ConcurrentRuleEvalController { func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool {
return concurrentRuleEvalController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} c.mu.Lock()
defer c.mu.Unlock()
depMap, found := c.depMaps[g]
if !found {
depMap = buildDependencyMap(g.rules)
c.depMaps[g] = depMap
}
return depMap.isIndependent(r)
} }
// Allow determines whether any concurrency slots are available. func (c *concurrentRuleEvalController) Allow() bool {
func (c concurrentRuleEvalController) Allow() bool {
if !c.enabled { if !c.enabled {
return false return false
} }
@ -437,11 +472,18 @@ func (c concurrentRuleEvalController) Allow() bool {
return c.sema.TryAcquire(1) return c.sema.TryAcquire(1)
} }
// Done releases a concurrent evaluation slot. func (c *concurrentRuleEvalController) Done() {
func (c concurrentRuleEvalController) Done() {
if !c.enabled { if !c.enabled {
return return
} }
c.sema.Release(1) c.sema.Release(1)
} }
func (c *concurrentRuleEvalController) Invalidate() {
c.mu.Lock()
defer c.mu.Unlock()
// Clear out the memoized dependency maps because some or all groups may have been updated.
c.depMaps = map[*Group]dependencyMap{}
}

View file

@ -1435,21 +1435,23 @@ func TestDependencyMap(t *testing.T) {
Opts: opts, Opts: opts,
}) })
require.Zero(t, group.dependencyMap.dependencies(rule)) depMap := buildDependencyMap(group.rules)
require.Equal(t, 2, group.dependencyMap.dependents(rule))
require.False(t, group.dependencyMap.isIndependent(rule))
require.Zero(t, group.dependencyMap.dependents(rule2)) require.Zero(t, depMap.dependencies(rule))
require.Equal(t, 1, group.dependencyMap.dependencies(rule2)) require.Equal(t, 2, depMap.dependents(rule))
require.False(t, group.dependencyMap.isIndependent(rule2)) require.False(t, depMap.isIndependent(rule))
require.Zero(t, group.dependencyMap.dependents(rule3)) require.Zero(t, depMap.dependents(rule2))
require.Zero(t, group.dependencyMap.dependencies(rule3)) require.Equal(t, 1, depMap.dependencies(rule2))
require.True(t, group.dependencyMap.isIndependent(rule3)) require.False(t, depMap.isIndependent(rule2))
require.Zero(t, group.dependencyMap.dependents(rule4)) require.Zero(t, depMap.dependents(rule3))
require.Equal(t, 1, group.dependencyMap.dependencies(rule4)) require.Zero(t, depMap.dependencies(rule3))
require.False(t, group.dependencyMap.isIndependent(rule4)) require.True(t, depMap.isIndependent(rule3))
require.Zero(t, depMap.dependents(rule4))
require.Equal(t, 1, depMap.dependencies(rule4))
require.False(t, depMap.isIndependent(rule4))
} }
func TestNoDependency(t *testing.T) { func TestNoDependency(t *testing.T) {
@ -1470,8 +1472,9 @@ func TestNoDependency(t *testing.T) {
Opts: opts, Opts: opts,
}) })
depMap := buildDependencyMap(group.rules)
// A group with only one rule cannot have dependencies. // A group with only one rule cannot have dependencies.
require.Empty(t, group.dependencyMap) require.Empty(t, depMap)
} }
func TestDependenciesEdgeCases(t *testing.T) { func TestDependenciesEdgeCases(t *testing.T) {
@ -1493,9 +1496,10 @@ func TestDependenciesEdgeCases(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
depMap := buildDependencyMap(group.rules)
// A group with no rules has no dependency map, but doesn't panic if the map is queried. // A group with no rules has no dependency map, but doesn't panic if the map is queried.
require.Nil(t, group.dependencyMap) require.Nil(t, depMap)
require.False(t, group.dependencyMap.isIndependent(rule)) require.False(t, depMap.isIndependent(rule))
}) })
t.Run("rules which reference no series", func(t *testing.T) { t.Run("rules which reference no series", func(t *testing.T) {
@ -1514,9 +1518,10 @@ func TestDependenciesEdgeCases(t *testing.T) {
Opts: opts, Opts: opts,
}) })
depMap := buildDependencyMap(group.rules)
// A group with rules which reference no series will still produce a dependency map // A group with rules which reference no series will still produce a dependency map
require.True(t, group.dependencyMap.isIndependent(rule1)) require.True(t, depMap.isIndependent(rule1))
require.True(t, group.dependencyMap.isIndependent(rule2)) require.True(t, depMap.isIndependent(rule2))
}) })
} }
@ -1542,10 +1547,11 @@ func TestNoMetricSelector(t *testing.T) {
Opts: opts, Opts: opts,
}) })
depMap := buildDependencyMap(group.rules)
// A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore // A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore
// all rules are not considered independent. // all rules are not considered independent.
require.False(t, group.dependencyMap.isIndependent(rule)) require.False(t, depMap.isIndependent(rule))
require.False(t, group.dependencyMap.isIndependent(rule2)) require.False(t, depMap.isIndependent(rule2))
} }
func TestDependentRulesWithNonMetricExpression(t *testing.T) { func TestDependentRulesWithNonMetricExpression(t *testing.T) {
@ -1574,9 +1580,10 @@ func TestDependentRulesWithNonMetricExpression(t *testing.T) {
Opts: opts, Opts: opts,
}) })
require.False(t, group.dependencyMap.isIndependent(rule)) depMap := buildDependencyMap(group.rules)
require.False(t, group.dependencyMap.isIndependent(rule2)) require.False(t, depMap.isIndependent(rule))
require.True(t, group.dependencyMap.isIndependent(rule3)) require.False(t, depMap.isIndependent(rule2))
require.True(t, depMap.isIndependent(rule3))
} }
func TestRulesDependentOnMetaMetrics(t *testing.T) { func TestRulesDependentOnMetaMetrics(t *testing.T) {
@ -1604,7 +1611,8 @@ func TestRulesDependentOnMetaMetrics(t *testing.T) {
Opts: opts, Opts: opts,
}) })
require.False(t, group.dependencyMap.isIndependent(rule)) depMap := buildDependencyMap(group.rules)
require.False(t, depMap.isIndependent(rule))
} }
func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
@ -1623,17 +1631,19 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
orig := make(map[string]dependencyMap, len(ruleManager.groups)) orig := make(map[string]dependencyMap, len(ruleManager.groups))
for _, g := range ruleManager.groups { for _, g := range ruleManager.groups {
depMap := buildDependencyMap(g.rules)
// No dependency map is expected because there is only one rule in the group. // No dependency map is expected because there is only one rule in the group.
require.Empty(t, g.dependencyMap) require.Empty(t, depMap)
orig[g.Name()] = g.dependencyMap orig[g.Name()] = depMap
} }
// Update once without changing groups. // Update once without changing groups.
err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
require.NoError(t, err) require.NoError(t, err)
for h, g := range ruleManager.groups { for h, g := range ruleManager.groups {
depMap := buildDependencyMap(g.rules)
// Dependency maps are the same because of no updates. // Dependency maps are the same because of no updates.
require.Equal(t, orig[h], g.dependencyMap) require.Equal(t, orig[h], depMap)
} }
// Groups will be recreated when updated. // Groups will be recreated when updated.
@ -1653,12 +1663,13 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName) require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName)
depMap := buildDependencyMap(g.rules)
// Dependency maps must change because the groups would've been updated. // Dependency maps must change because the groups would've been updated.
require.NotEqual(t, orig[h], g.dependencyMap) require.NotEqual(t, orig[h], depMap)
// We expect there to be some dependencies since the new rule group contains a dependency. // We expect there to be some dependencies since the new rule group contains a dependency.
require.Greater(t, len(g.dependencyMap), 0) require.Greater(t, len(depMap), 0)
require.Equal(t, 1, g.dependencyMap.dependents(rr)) require.Equal(t, 1, depMap.dependents(rr))
require.Zero(t, g.dependencyMap.dependencies(rr)) require.Zero(t, depMap.dependencies(rr))
} }
} }
@ -1674,7 +1685,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
) )
files := []string{"fixtures/rules_multiple.yaml"} files := []string{"fixtures/rules_multiple.yaml"}
ruleManager := NewManager(&ManagerOptions{ opts := &ManagerOptions{
Context: context.Background(), Context: context.Background(),
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
Appendable: storage, Appendable: storage,
@ -1692,39 +1703,42 @@ func TestAsyncRuleEvaluation(t *testing.T) {
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
}, nil }, nil
}, },
}) }
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) inflightTracker := func(ctx context.Context) {
require.Empty(t, errs) for {
require.Len(t, groups, 1) select {
case <-ctx.Done():
return
default:
highWatermark := maxInflight.Load()
current := inflightQueries.Load()
if current > highWatermark {
maxInflight.Store(current)
}
time.Sleep(time.Millisecond)
}
}
}
expectedRules := 4 expectedRules := 4
t.Run("synchronous evaluation with independent rules", func(t *testing.T) { t.Run("synchronous evaluation with independent rules", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups { for _, group := range groups {
require.Len(t, group.rules, expectedRules) require.Len(t, group.rules, expectedRules)
start := time.Now() start := time.Now()
// Never expect more than 1 inflight query at a time. // Never expect more than 1 inflight query at a time.
go func() { go inflightTracker(ctx)
for {
select {
case <-ctx.Done():
return
default:
highWatermark := maxInflight.Load()
current := inflightQueries.Load()
if current > highWatermark {
maxInflight.Store(current)
}
time.Sleep(time.Millisecond)
}
}
}()
group.Eval(ctx, start) group.Eval(ctx, start)
@ -1744,33 +1758,27 @@ func TestAsyncRuleEvaluation(t *testing.T) {
maxInflight.Store(0) maxInflight.Store(0)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// Configure concurrency settings.
opts.ConcurrentEvalsEnabled = true
opts.MaxConcurrentEvals = 2
opts.RuleConcurrencyController = nil
ruleManager := NewManager(opts)
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
require.Empty(t, errs)
require.Len(t, groups, 1)
for _, group := range groups { for _, group := range groups {
// Allow up to 2 concurrent rule evaluations.
group.opts.ConcurrentEvalsController = NewConcurrentRuleEvalController(true, 2)
require.Len(t, group.rules, expectedRules) require.Len(t, group.rules, expectedRules)
start := time.Now() start := time.Now()
go func() { go inflightTracker(ctx)
for {
select {
case <-ctx.Done():
return
default:
highWatermark := maxInflight.Load()
current := inflightQueries.Load()
if current > highWatermark {
maxInflight.Store(current)
}
time.Sleep(time.Millisecond)
}
}
}()
group.Eval(ctx, start) group.Eval(ctx, start)
require.EqualValues(t, 3, maxInflight.Load()) // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
// Some rules should execute concurrently so should complete quicker. // Some rules should execute concurrently so should complete quicker.
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds())
// Each rule produces one vector. // Each rule produces one vector.