mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #12946 from dannykopping/dannykopping/rule-deps
Evaluate independent rules concurrently
This commit is contained in:
commit
ae2852be6a
|
@ -137,6 +137,7 @@ type flagConfig struct {
|
||||||
forGracePeriod model.Duration
|
forGracePeriod model.Duration
|
||||||
outageTolerance model.Duration
|
outageTolerance model.Duration
|
||||||
resendDelay model.Duration
|
resendDelay model.Duration
|
||||||
|
maxConcurrentEvals int64
|
||||||
web web.Options
|
web web.Options
|
||||||
scrape scrape.Options
|
scrape scrape.Options
|
||||||
tsdb tsdbOptions
|
tsdb tsdbOptions
|
||||||
|
@ -157,6 +158,7 @@ type flagConfig struct {
|
||||||
enablePerStepStats bool
|
enablePerStepStats bool
|
||||||
enableAutoGOMAXPROCS bool
|
enableAutoGOMAXPROCS bool
|
||||||
enableAutoGOMEMLIMIT bool
|
enableAutoGOMEMLIMIT bool
|
||||||
|
enableConcurrentRuleEval bool
|
||||||
|
|
||||||
prometheusURL string
|
prometheusURL string
|
||||||
corsRegexString string
|
corsRegexString string
|
||||||
|
@ -203,6 +205,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
|
||||||
case "auto-gomemlimit":
|
case "auto-gomemlimit":
|
||||||
c.enableAutoGOMEMLIMIT = true
|
c.enableAutoGOMEMLIMIT = true
|
||||||
level.Info(logger).Log("msg", "Automatically set GOMEMLIMIT to match Linux container or system memory limit")
|
level.Info(logger).Log("msg", "Automatically set GOMEMLIMIT to match Linux container or system memory limit")
|
||||||
|
case "concurrent-rule-eval":
|
||||||
|
c.enableConcurrentRuleEval = true
|
||||||
|
level.Info(logger).Log("msg", "Experimental concurrent rule evaluation enabled.")
|
||||||
case "no-default-scrape-port":
|
case "no-default-scrape-port":
|
||||||
c.scrape.NoDefaultPort = true
|
c.scrape.NoDefaultPort = true
|
||||||
level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.")
|
level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.")
|
||||||
|
@ -411,6 +416,9 @@ func main() {
|
||||||
serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
|
serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
|
||||||
Default("1m").SetValue(&cfg.resendDelay)
|
Default("1m").SetValue(&cfg.resendDelay)
|
||||||
|
|
||||||
|
serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules that can run concurrently.").
|
||||||
|
Default("4").Int64Var(&cfg.maxConcurrentEvals)
|
||||||
|
|
||||||
a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release.").
|
a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release.").
|
||||||
Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps)
|
Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps)
|
||||||
|
|
||||||
|
@ -749,17 +757,19 @@ func main() {
|
||||||
queryEngine = promql.NewEngine(opts)
|
queryEngine = promql.NewEngine(opts)
|
||||||
|
|
||||||
ruleManager = rules.NewManager(&rules.ManagerOptions{
|
ruleManager = rules.NewManager(&rules.ManagerOptions{
|
||||||
Appendable: fanoutStorage,
|
Appendable: fanoutStorage,
|
||||||
Queryable: localStorage,
|
Queryable: localStorage,
|
||||||
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
|
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
|
||||||
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
|
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
|
||||||
Context: ctxRule,
|
Context: ctxRule,
|
||||||
ExternalURL: cfg.web.ExternalURL,
|
ExternalURL: cfg.web.ExternalURL,
|
||||||
Registerer: prometheus.DefaultRegisterer,
|
Registerer: prometheus.DefaultRegisterer,
|
||||||
Logger: log.With(logger, "component", "rule manager"),
|
Logger: log.With(logger, "component", "rule manager"),
|
||||||
OutageTolerance: time.Duration(cfg.outageTolerance),
|
OutageTolerance: time.Duration(cfg.outageTolerance),
|
||||||
ForGracePeriod: time.Duration(cfg.forGracePeriod),
|
ForGracePeriod: time.Duration(cfg.forGracePeriod),
|
||||||
ResendDelay: time.Duration(cfg.resendDelay),
|
ResendDelay: time.Duration(cfg.resendDelay),
|
||||||
|
MaxConcurrentEvals: cfg.maxConcurrentEvals,
|
||||||
|
ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,7 @@ The Prometheus monitoring server
|
||||||
| <code class="text-nowrap">--rules.alert.for-outage-tolerance</code> | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` |
|
| <code class="text-nowrap">--rules.alert.for-outage-tolerance</code> | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` |
|
||||||
| <code class="text-nowrap">--rules.alert.for-grace-period</code> | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` |
|
| <code class="text-nowrap">--rules.alert.for-grace-period</code> | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` |
|
||||||
| <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
|
| <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
|
||||||
|
| <code class="text-nowrap">--rules.max-concurrent-evals</code> | Global concurrency limit for independent rules that can run concurrently. Use with server mode only. | `4` |
|
||||||
| <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
|
| <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
|
||||||
| <code class="text-nowrap">--query.lookback-delta</code> | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` |
|
| <code class="text-nowrap">--query.lookback-delta</code> | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` |
|
||||||
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
|
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
|
||||||
|
|
|
@ -212,3 +212,15 @@ Enables ingestion of created timestamp. Created timestamps are injected as 0 val
|
||||||
Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details).
|
Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details).
|
||||||
|
|
||||||
Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped.
|
Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped.
|
||||||
|
|
||||||
|
## Concurrent evaluation of independent rules
|
||||||
|
|
||||||
|
`--enable-feature=concurrent-rule-eval`
|
||||||
|
|
||||||
|
By default, rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the
|
||||||
|
output of a preceding rule as its input. However, if there is no detectable relationship between rules then there is no
|
||||||
|
reason to run them sequentially.
|
||||||
|
When the `concurrent-rule-eval` feature flag is enabled, rules without any dependency on other rules within a rule group will be evaluated concurrently.
|
||||||
|
This has the potential to improve rule group evaluation latency and resource utilization at the expense of adding more concurrent query load.
|
||||||
|
|
||||||
|
The number of concurrent rule evaluations can be configured with `--rules.max-concurrent-rule-evals`, which is set to `4` by default.
|
||||||
|
|
7
rules/fixtures/rules_dependencies.yaml
Normal file
7
rules/fixtures/rules_dependencies.yaml
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
groups:
|
||||||
|
- name: test
|
||||||
|
rules:
|
||||||
|
- record: job:http_requests:rate5m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[5m]))
|
||||||
|
- record: HighRequestRate
|
||||||
|
expr: job:http_requests:rate5m > 100
|
14
rules/fixtures/rules_multiple.yaml
Normal file
14
rules/fixtures/rules_multiple.yaml
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
groups:
|
||||||
|
- name: test
|
||||||
|
rules:
|
||||||
|
# independents
|
||||||
|
- record: job:http_requests:rate1m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[1m]))
|
||||||
|
- record: job:http_requests:rate5m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[5m]))
|
||||||
|
|
||||||
|
# dependents
|
||||||
|
- record: job:http_requests:rate15m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[15m]))
|
||||||
|
- record: TooManyRequests
|
||||||
|
expr: job:http_requests:rate15m > 100
|
28
rules/fixtures/rules_multiple_groups.yaml
Normal file
28
rules/fixtures/rules_multiple_groups.yaml
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
groups:
|
||||||
|
- name: http
|
||||||
|
rules:
|
||||||
|
# independents
|
||||||
|
- record: job:http_requests:rate1m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[1m]))
|
||||||
|
- record: job:http_requests:rate5m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[5m]))
|
||||||
|
|
||||||
|
# dependents
|
||||||
|
- record: job:http_requests:rate15m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[15m]))
|
||||||
|
- record: TooManyHTTPRequests
|
||||||
|
expr: job:http_requests:rate15m > 100
|
||||||
|
|
||||||
|
- name: grpc
|
||||||
|
rules:
|
||||||
|
# independents
|
||||||
|
- record: job:grpc_requests:rate1m
|
||||||
|
expr: sum by (job)(rate(grpc_requests_total[1m]))
|
||||||
|
- record: job:grpc_requests:rate5m
|
||||||
|
expr: sum by (job)(rate(grpc_requests_total[5m]))
|
||||||
|
|
||||||
|
# dependents
|
||||||
|
- record: job:grpc_requests:rate15m
|
||||||
|
expr: sum by (job)(rate(grpc_requests_total[15m]))
|
||||||
|
- record: TooManyGRPCRequests
|
||||||
|
expr: job:grpc_requests:rate15m > 100
|
15
rules/fixtures/rules_multiple_independent.yaml
Normal file
15
rules/fixtures/rules_multiple_independent.yaml
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
groups:
|
||||||
|
- name: independents
|
||||||
|
rules:
|
||||||
|
- record: job:http_requests:rate1m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[1m]))
|
||||||
|
- record: job:http_requests:rate5m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[5m]))
|
||||||
|
- record: job:http_requests:rate15m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[15m]))
|
||||||
|
- record: job:http_requests:rate30m
|
||||||
|
expr: sum by (job)(rate(http_requests_total[30m]))
|
||||||
|
- record: job:http_requests:rate1h
|
||||||
|
expr: sum by (job)(rate(http_requests_total[1h]))
|
||||||
|
- record: job:http_requests:rate2h
|
||||||
|
expr: sum by (job)(rate(http_requests_total[2h]))
|
184
rules/group.go
184
rules/group.go
|
@ -21,8 +21,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/promql/parser"
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
"github.com/go-kit/log/level"
|
"github.com/go-kit/log/level"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
@ -68,6 +71,9 @@ type Group struct {
|
||||||
// Rule group evaluation iteration function,
|
// Rule group evaluation iteration function,
|
||||||
// defaults to DefaultEvalIterationFunc.
|
// defaults to DefaultEvalIterationFunc.
|
||||||
evalIterationFunc GroupEvalIterationFunc
|
evalIterationFunc GroupEvalIterationFunc
|
||||||
|
|
||||||
|
// concurrencyController controls the rules evaluation concurrency.
|
||||||
|
concurrencyController RuleConcurrencyController
|
||||||
}
|
}
|
||||||
|
|
||||||
// GroupEvalIterationFunc is used to implement and extend rule group
|
// GroupEvalIterationFunc is used to implement and extend rule group
|
||||||
|
@ -111,21 +117,27 @@ func NewGroup(o GroupOptions) *Group {
|
||||||
evalIterationFunc = DefaultEvalIterationFunc
|
evalIterationFunc = DefaultEvalIterationFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
concurrencyController := o.Opts.RuleConcurrencyController
|
||||||
|
if concurrencyController == nil {
|
||||||
|
concurrencyController = sequentialRuleEvalController{}
|
||||||
|
}
|
||||||
|
|
||||||
return &Group{
|
return &Group{
|
||||||
name: o.Name,
|
name: o.Name,
|
||||||
file: o.File,
|
file: o.File,
|
||||||
interval: o.Interval,
|
interval: o.Interval,
|
||||||
limit: o.Limit,
|
limit: o.Limit,
|
||||||
rules: o.Rules,
|
rules: o.Rules,
|
||||||
shouldRestore: o.ShouldRestore,
|
shouldRestore: o.ShouldRestore,
|
||||||
opts: o.Opts,
|
opts: o.Opts,
|
||||||
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
|
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
managerDone: o.done,
|
managerDone: o.done,
|
||||||
terminated: make(chan struct{}),
|
terminated: make(chan struct{}),
|
||||||
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
|
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
evalIterationFunc: evalIterationFunc,
|
evalIterationFunc: evalIterationFunc,
|
||||||
|
concurrencyController: concurrencyController,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -420,8 +432,13 @@ func (g *Group) CopyState(from *Group) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
|
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
|
||||||
|
// Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled.
|
||||||
func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
var samplesTotal float64
|
var (
|
||||||
|
samplesTotal atomic.Float64
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
|
||||||
for i, rule := range g.rules {
|
for i, rule := range g.rules {
|
||||||
select {
|
select {
|
||||||
case <-g.done:
|
case <-g.done:
|
||||||
|
@ -429,7 +446,11 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
func(i int, rule Rule) {
|
eval := func(i int, rule Rule, cleanup func()) {
|
||||||
|
if cleanup != nil {
|
||||||
|
defer cleanup()
|
||||||
|
}
|
||||||
|
|
||||||
logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i)
|
logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i)
|
||||||
ctx, sp := otel.Tracer("").Start(ctx, "rule")
|
ctx, sp := otel.Tracer("").Start(ctx, "rule")
|
||||||
sp.SetAttributes(attribute.String("name", rule.Name()))
|
sp.SetAttributes(attribute.String("name", rule.Name()))
|
||||||
|
@ -465,7 +486,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
}
|
}
|
||||||
rule.SetHealth(HealthGood)
|
rule.SetHealth(HealthGood)
|
||||||
rule.SetLastError(nil)
|
rule.SetLastError(nil)
|
||||||
samplesTotal += float64(len(vector))
|
samplesTotal.Add(float64(len(vector)))
|
||||||
|
|
||||||
if ar, ok := rule.(*AlertingRule); ok {
|
if ar, ok := rule.(*AlertingRule); ok {
|
||||||
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
|
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
|
||||||
|
@ -554,11 +575,25 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(i, rule)
|
}
|
||||||
}
|
|
||||||
if g.metrics != nil {
|
// If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
|
||||||
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
|
// Try run concurrently if there are slots available.
|
||||||
|
if ctrl := g.concurrencyController; ctrl.RuleEligible(g, rule) && ctrl.Allow() {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go eval(i, rule, func() {
|
||||||
|
wg.Done()
|
||||||
|
ctrl.Done()
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
eval(i, rule, nil)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())
|
||||||
g.cleanupStaleSeries(ctx, ts)
|
g.cleanupStaleSeries(ctx, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -866,3 +901,110 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics {
|
||||||
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dependencyMap is a data-structure which contains the relationships between rules within a group.
|
||||||
|
// It is used to describe the dependency associations between rules in a group whereby one rule uses the
|
||||||
|
// output metric produced by another rule in its expression (i.e. as its "input").
|
||||||
|
type dependencyMap map[Rule][]Rule
|
||||||
|
|
||||||
|
// dependents returns the count of rules which use the output of the given rule as one of their inputs.
|
||||||
|
func (m dependencyMap) dependents(r Rule) int {
|
||||||
|
return len(m[r])
|
||||||
|
}
|
||||||
|
|
||||||
|
// dependencies returns the count of rules on which the given rule is dependent for input.
|
||||||
|
func (m dependencyMap) dependencies(r Rule) int {
|
||||||
|
if len(m) == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
var count int
|
||||||
|
for _, children := range m {
|
||||||
|
for _, child := range children {
|
||||||
|
if child == r {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
// isIndependent determines whether the given rule is not dependent on another rule for its input, nor is any other rule
|
||||||
|
// dependent on its output.
|
||||||
|
func (m dependencyMap) isIndependent(r Rule) bool {
|
||||||
|
if m == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.dependents(r)+m.dependencies(r) == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildDependencyMap builds a data-structure which contains the relationships between rules within a group.
|
||||||
|
//
|
||||||
|
// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose
|
||||||
|
// output an Alert rule depends will not be able to run concurrently.
|
||||||
|
//
|
||||||
|
// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be
|
||||||
|
// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour:
|
||||||
|
// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector
|
||||||
|
// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE
|
||||||
|
//
|
||||||
|
// Rules which are independent can run concurrently with no side-effects.
|
||||||
|
func buildDependencyMap(rules []Rule) dependencyMap {
|
||||||
|
dependencies := make(dependencyMap)
|
||||||
|
|
||||||
|
if len(rules) <= 1 {
|
||||||
|
// No relationships if group has 1 or fewer rules.
|
||||||
|
return dependencies
|
||||||
|
}
|
||||||
|
|
||||||
|
inputs := make(map[string][]Rule, len(rules))
|
||||||
|
outputs := make(map[string][]Rule, len(rules))
|
||||||
|
|
||||||
|
var indeterminate bool
|
||||||
|
|
||||||
|
for _, rule := range rules {
|
||||||
|
if indeterminate {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
name := rule.Name()
|
||||||
|
outputs[name] = append(outputs[name], rule)
|
||||||
|
|
||||||
|
parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error {
|
||||||
|
if n, ok := node.(*parser.VectorSelector); ok {
|
||||||
|
// A wildcard metric expression means we cannot reliably determine if this rule depends on any other,
|
||||||
|
// which means we cannot safely run any rules concurrently.
|
||||||
|
if n.Name == "" && len(n.LabelMatchers) > 0 {
|
||||||
|
indeterminate = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour
|
||||||
|
// if they run concurrently.
|
||||||
|
if n.Name == alertMetricName || n.Name == alertForStateMetricName {
|
||||||
|
indeterminate = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
inputs[n.Name] = append(inputs[n.Name], rule)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if indeterminate {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for output, outRules := range outputs {
|
||||||
|
for _, outRule := range outRules {
|
||||||
|
if inRules, found := inputs[output]; found && len(inRules) > 0 {
|
||||||
|
dependencies[outRule] = append(dependencies[outRule], inRules...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return dependencies
|
||||||
|
}
|
||||||
|
|
115
rules/manager.go
115
rules/manager.go
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/go-kit/log/level"
|
"github.com/go-kit/log/level"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/model/rulefmt"
|
"github.com/prometheus/prometheus/model/rulefmt"
|
||||||
|
@ -103,18 +104,21 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
|
||||||
|
|
||||||
// ManagerOptions bundles options for the Manager.
|
// ManagerOptions bundles options for the Manager.
|
||||||
type ManagerOptions struct {
|
type ManagerOptions struct {
|
||||||
ExternalURL *url.URL
|
ExternalURL *url.URL
|
||||||
QueryFunc QueryFunc
|
QueryFunc QueryFunc
|
||||||
NotifyFunc NotifyFunc
|
NotifyFunc NotifyFunc
|
||||||
Context context.Context
|
Context context.Context
|
||||||
Appendable storage.Appendable
|
Appendable storage.Appendable
|
||||||
Queryable storage.Queryable
|
Queryable storage.Queryable
|
||||||
Logger log.Logger
|
Logger log.Logger
|
||||||
Registerer prometheus.Registerer
|
Registerer prometheus.Registerer
|
||||||
OutageTolerance time.Duration
|
OutageTolerance time.Duration
|
||||||
ForGracePeriod time.Duration
|
ForGracePeriod time.Duration
|
||||||
ResendDelay time.Duration
|
ResendDelay time.Duration
|
||||||
GroupLoader GroupLoader
|
GroupLoader GroupLoader
|
||||||
|
MaxConcurrentEvals int64
|
||||||
|
ConcurrentEvalsEnabled bool
|
||||||
|
RuleConcurrencyController RuleConcurrencyController
|
||||||
|
|
||||||
Metrics *Metrics
|
Metrics *Metrics
|
||||||
}
|
}
|
||||||
|
@ -130,6 +134,14 @@ func NewManager(o *ManagerOptions) *Manager {
|
||||||
o.GroupLoader = FileLoader{}
|
o.GroupLoader = FileLoader{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if o.RuleConcurrencyController == nil {
|
||||||
|
if o.ConcurrentEvalsEnabled {
|
||||||
|
o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals)
|
||||||
|
} else {
|
||||||
|
o.RuleConcurrencyController = sequentialRuleEvalController{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
m := &Manager{
|
m := &Manager{
|
||||||
groups: map[string]*Group{},
|
groups: map[string]*Group{},
|
||||||
opts: o,
|
opts: o,
|
||||||
|
@ -176,6 +188,8 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
|
||||||
m.mtx.Lock()
|
m.mtx.Lock()
|
||||||
defer m.mtx.Unlock()
|
defer m.mtx.Unlock()
|
||||||
|
|
||||||
|
m.opts.RuleConcurrencyController.Invalidate()
|
||||||
|
|
||||||
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...)
|
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...)
|
||||||
|
|
||||||
if errs != nil {
|
if errs != nil {
|
||||||
|
@ -403,3 +417,80 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose is to bound the amount
|
||||||
|
// of concurrency in rule evaluations to avoid overwhelming the Prometheus server with additional query load and ensure
|
||||||
|
// the correctness of rules running concurrently. Concurrency is controlled globally, not on a per-group basis.
|
||||||
|
type RuleConcurrencyController interface {
|
||||||
|
// RuleEligible determines if the rule can guarantee correct results while running concurrently.
|
||||||
|
RuleEligible(g *Group, r Rule) bool
|
||||||
|
|
||||||
|
// Allow determines whether any concurrent evaluation slots are available.
|
||||||
|
// If Allow() returns true, then Done() must be called to release the acquired slot.
|
||||||
|
Allow() bool
|
||||||
|
|
||||||
|
// Done releases a concurrent evaluation slot.
|
||||||
|
Done()
|
||||||
|
|
||||||
|
// Invalidate instructs the controller to invalidate its state.
|
||||||
|
// This should be called when groups are modified (during a reload, for instance), because the controller may
|
||||||
|
// store some state about each group in order to more efficiently determine rule eligibility.
|
||||||
|
Invalidate()
|
||||||
|
}
|
||||||
|
|
||||||
|
// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
|
||||||
|
type concurrentRuleEvalController struct {
|
||||||
|
sema *semaphore.Weighted
|
||||||
|
depMapsMu sync.Mutex
|
||||||
|
depMaps map[*Group]dependencyMap
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController {
|
||||||
|
return &concurrentRuleEvalController{
|
||||||
|
sema: semaphore.NewWeighted(maxConcurrency),
|
||||||
|
depMaps: map[*Group]dependencyMap{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool {
|
||||||
|
c.depMapsMu.Lock()
|
||||||
|
defer c.depMapsMu.Unlock()
|
||||||
|
|
||||||
|
depMap, found := c.depMaps[g]
|
||||||
|
if !found {
|
||||||
|
depMap = buildDependencyMap(g.rules)
|
||||||
|
c.depMaps[g] = depMap
|
||||||
|
}
|
||||||
|
|
||||||
|
return depMap.isIndependent(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *concurrentRuleEvalController) Allow() bool {
|
||||||
|
return c.sema.TryAcquire(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *concurrentRuleEvalController) Done() {
|
||||||
|
c.sema.Release(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *concurrentRuleEvalController) Invalidate() {
|
||||||
|
c.depMapsMu.Lock()
|
||||||
|
defer c.depMapsMu.Unlock()
|
||||||
|
|
||||||
|
// Clear out the memoized dependency maps because some or all groups may have been updated.
|
||||||
|
c.depMaps = map[*Group]dependencyMap{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially.
|
||||||
|
type sequentialRuleEvalController struct{}
|
||||||
|
|
||||||
|
func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c sequentialRuleEvalController) Allow() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c sequentialRuleEvalController) Done() {}
|
||||||
|
func (c sequentialRuleEvalController) Invalidate() {}
|
||||||
|
|
|
@ -19,11 +19,13 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
|
@ -674,8 +676,10 @@ func TestDeletedRuleMarkedStale(t *testing.T) {
|
||||||
rules: []Rule{},
|
rules: []Rule{},
|
||||||
seriesInPreviousEval: []map[string]labels.Labels{},
|
seriesInPreviousEval: []map[string]labels.Labels{},
|
||||||
opts: &ManagerOptions{
|
opts: &ManagerOptions{
|
||||||
Appendable: st,
|
Appendable: st,
|
||||||
|
RuleConcurrencyController: sequentialRuleEvalController{},
|
||||||
},
|
},
|
||||||
|
metrics: NewGroupMetrics(nil),
|
||||||
}
|
}
|
||||||
newGroup.CopyState(oldGroup)
|
newGroup.CopyState(oldGroup)
|
||||||
|
|
||||||
|
@ -1402,3 +1406,616 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
|
||||||
require.Equal(t, expHist, fh)
|
require.Equal(t, expHist, fh)
|
||||||
require.Equal(t, chunkenc.ValNone, it.Next())
|
require.Equal(t, chunkenc.ValNone, it.Next())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDependencyMap(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
opts := &ManagerOptions{
|
||||||
|
Context: ctx,
|
||||||
|
Logger: log.NewNopLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr("user:requests:rate1m <= 0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger())
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr("sum by (user) (rate(requests[5m]))")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule3 := NewRecordingRule("user:requests:rate5m", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr("increase(user:requests:rate1m[1h])")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule4 := NewRecordingRule("user:requests:increase1h", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule, rule2, rule3, rule4},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
|
||||||
|
require.Zero(t, depMap.dependencies(rule))
|
||||||
|
require.Equal(t, 2, depMap.dependents(rule))
|
||||||
|
require.False(t, depMap.isIndependent(rule))
|
||||||
|
|
||||||
|
require.Zero(t, depMap.dependents(rule2))
|
||||||
|
require.Equal(t, 1, depMap.dependencies(rule2))
|
||||||
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
|
|
||||||
|
require.Zero(t, depMap.dependents(rule3))
|
||||||
|
require.Zero(t, depMap.dependencies(rule3))
|
||||||
|
require.True(t, depMap.isIndependent(rule3))
|
||||||
|
|
||||||
|
require.Zero(t, depMap.dependents(rule4))
|
||||||
|
require.Equal(t, 1, depMap.dependencies(rule4))
|
||||||
|
require.False(t, depMap.isIndependent(rule4))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoDependency(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
opts := &ManagerOptions{
|
||||||
|
Context: ctx,
|
||||||
|
Logger: log.NewNopLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
// A group with only one rule cannot have dependencies.
|
||||||
|
require.Empty(t, depMap)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDependenciesEdgeCases(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
opts := &ManagerOptions{
|
||||||
|
Context: ctx,
|
||||||
|
Logger: log.NewNopLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("empty group", func(t *testing.T) {
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{}, // empty group
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
// A group with no rules has no dependency map, but doesn't panic if the map is queried.
|
||||||
|
require.Empty(t, depMap)
|
||||||
|
require.True(t, depMap.isIndependent(rule))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("rules which reference no series", func(t *testing.T) {
|
||||||
|
expr, err := parser.ParseExpr("one")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule1 := NewRecordingRule("1", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr("two")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewRecordingRule("2", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule1, rule2},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
// A group with rules which reference no series will still produce a dependency map
|
||||||
|
require.True(t, depMap.isIndependent(rule1))
|
||||||
|
require.True(t, depMap.isIndependent(rule2))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("rule with regexp matcher on metric name", func(t *testing.T) {
|
||||||
|
expr, err := parser.ParseExpr("sum(requests)")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule1 := NewRecordingRule("first", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr(`sum({__name__=~".+"})`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewRecordingRule("second", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule1, rule2},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
// A rule with regexp matcher on metric name causes the whole group to be indeterminate.
|
||||||
|
require.False(t, depMap.isIndependent(rule1))
|
||||||
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("rule with not equal matcher on metric name", func(t *testing.T) {
|
||||||
|
expr, err := parser.ParseExpr("sum(requests)")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule1 := NewRecordingRule("first", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr(`sum({__name__!="requests", service="app"})`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewRecordingRule("second", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule1, rule2},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
// A rule with not equal matcher on metric name causes the whole group to be indeterminate.
|
||||||
|
require.False(t, depMap.isIndependent(rule1))
|
||||||
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("rule with not regexp matcher on metric name", func(t *testing.T) {
|
||||||
|
expr, err := parser.ParseExpr("sum(requests)")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule1 := NewRecordingRule("first", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr(`sum({__name__!~"requests.+", service="app"})`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewRecordingRule("second", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule1, rule2},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
// A rule with not regexp matcher on metric name causes the whole group to be indeterminate.
|
||||||
|
require.False(t, depMap.isIndependent(rule1))
|
||||||
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("rule querying ALERTS metric", func(t *testing.T) {
|
||||||
|
expr, err := parser.ParseExpr("sum(requests)")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule1 := NewRecordingRule("first", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr(`sum(ALERTS{alertname="test"})`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewRecordingRule("second", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule1, rule2},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
// A rule querying ALERTS metric causes the whole group to be indeterminate.
|
||||||
|
require.False(t, depMap.isIndependent(rule1))
|
||||||
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("rule querying ALERTS_FOR_STATE metric", func(t *testing.T) {
|
||||||
|
expr, err := parser.ParseExpr("sum(requests)")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule1 := NewRecordingRule("first", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr(`sum(ALERTS_FOR_STATE{alertname="test"})`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewRecordingRule("second", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule1, rule2},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
// A rule querying ALERTS_FOR_STATE metric causes the whole group to be indeterminate.
|
||||||
|
require.False(t, depMap.isIndependent(rule1))
|
||||||
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoMetricSelector(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
opts := &ManagerOptions{
|
||||||
|
Context: ctx,
|
||||||
|
Logger: log.NewNopLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr(`count({user="bob"})`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule, rule2},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
// A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore
|
||||||
|
// all rules are not considered independent.
|
||||||
|
require.False(t, depMap.isIndependent(rule))
|
||||||
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDependentRulesWithNonMetricExpression(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
opts := &ManagerOptions{
|
||||||
|
Context: ctx,
|
||||||
|
Logger: log.NewNopLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr("user:requests:rate1m <= 0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger())
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr("3")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule3 := NewRecordingRule("three", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule, rule2, rule3},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
require.False(t, depMap.isIndependent(rule))
|
||||||
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
|
require.True(t, depMap.isIndependent(rule3))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRulesDependentOnMetaMetrics(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
opts := &ManagerOptions{
|
||||||
|
Context: ctx,
|
||||||
|
Logger: log.NewNopLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// This rule is not dependent on any other rules in its group but it does depend on `ALERTS`, which is produced by
|
||||||
|
// the rule engine, and is therefore not independent.
|
||||||
|
expr, err := parser.ParseExpr("count(ALERTS)")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule := NewRecordingRule("alert_count", expr, labels.Labels{})
|
||||||
|
|
||||||
|
// Create another rule so a dependency map is built (no map is built if a group contains one or fewer rules).
|
||||||
|
expr, err = parser.ParseExpr("1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewRecordingRule("one", expr, labels.Labels{})
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule, rule2},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
require.False(t, depMap.isIndependent(rule))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
|
||||||
|
files := []string{"fixtures/rules.yaml"}
|
||||||
|
ruleManager := NewManager(&ManagerOptions{
|
||||||
|
Context: context.Background(),
|
||||||
|
Logger: log.NewNopLogger(),
|
||||||
|
})
|
||||||
|
|
||||||
|
ruleManager.start()
|
||||||
|
defer ruleManager.Stop()
|
||||||
|
|
||||||
|
err := ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, ruleManager.groups, "expected non-empty rule groups")
|
||||||
|
|
||||||
|
orig := make(map[string]dependencyMap, len(ruleManager.groups))
|
||||||
|
for _, g := range ruleManager.groups {
|
||||||
|
depMap := buildDependencyMap(g.rules)
|
||||||
|
// No dependency map is expected because there is only one rule in the group.
|
||||||
|
require.Empty(t, depMap)
|
||||||
|
orig[g.Name()] = depMap
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update once without changing groups.
|
||||||
|
err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for h, g := range ruleManager.groups {
|
||||||
|
depMap := buildDependencyMap(g.rules)
|
||||||
|
// Dependency maps are the same because of no updates.
|
||||||
|
if orig[h] == nil {
|
||||||
|
require.Empty(t, orig[h])
|
||||||
|
require.Empty(t, depMap)
|
||||||
|
} else {
|
||||||
|
require.Equal(t, orig[h], depMap)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Groups will be recreated when updated.
|
||||||
|
files[0] = "fixtures/rules_dependencies.yaml"
|
||||||
|
err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for h, g := range ruleManager.groups {
|
||||||
|
const ruleName = "job:http_requests:rate5m"
|
||||||
|
var rr *RecordingRule
|
||||||
|
|
||||||
|
for _, r := range g.rules {
|
||||||
|
if r.Name() == ruleName {
|
||||||
|
rr = r.(*RecordingRule)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName)
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(g.rules)
|
||||||
|
// Dependency maps must change because the groups would've been updated.
|
||||||
|
require.NotEqual(t, orig[h], depMap)
|
||||||
|
// We expect there to be some dependencies since the new rule group contains a dependency.
|
||||||
|
require.NotEmpty(t, depMap)
|
||||||
|
require.Equal(t, 1, depMap.dependents(rr))
|
||||||
|
require.Zero(t, depMap.dependencies(rr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAsyncRuleEvaluation(t *testing.T) {
|
||||||
|
storage := teststorage.New(t)
|
||||||
|
t.Cleanup(func() { storage.Close() })
|
||||||
|
|
||||||
|
var (
|
||||||
|
inflightQueries atomic.Int32
|
||||||
|
maxInflight atomic.Int32
|
||||||
|
)
|
||||||
|
|
||||||
|
t.Run("synchronous evaluation with independent rules", func(t *testing.T) {
|
||||||
|
// Reset.
|
||||||
|
inflightQueries.Store(0)
|
||||||
|
maxInflight.Store(0)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0))
|
||||||
|
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...)
|
||||||
|
require.Empty(t, errs)
|
||||||
|
require.Len(t, groups, 1)
|
||||||
|
|
||||||
|
ruleCount := 4
|
||||||
|
|
||||||
|
for _, group := range groups {
|
||||||
|
require.Len(t, group.rules, ruleCount)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
group.Eval(ctx, start)
|
||||||
|
|
||||||
|
// Never expect more than 1 inflight query at a time.
|
||||||
|
require.EqualValues(t, 1, maxInflight.Load())
|
||||||
|
// Each rule should take at least 1 second to execute sequentially.
|
||||||
|
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
|
||||||
|
// Each rule produces one vector.
|
||||||
|
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) {
|
||||||
|
// Reset.
|
||||||
|
inflightQueries.Store(0)
|
||||||
|
maxInflight.Store(0)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
ruleCount := 4
|
||||||
|
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
||||||
|
|
||||||
|
// Configure concurrency settings.
|
||||||
|
opts.ConcurrentEvalsEnabled = true
|
||||||
|
opts.MaxConcurrentEvals = 2
|
||||||
|
opts.RuleConcurrencyController = nil
|
||||||
|
ruleManager := NewManager(opts)
|
||||||
|
|
||||||
|
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...)
|
||||||
|
require.Empty(t, errs)
|
||||||
|
require.Len(t, groups, 1)
|
||||||
|
|
||||||
|
for _, group := range groups {
|
||||||
|
require.Len(t, group.rules, ruleCount)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
group.Eval(ctx, start)
|
||||||
|
|
||||||
|
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
|
||||||
|
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
|
||||||
|
// Some rules should execute concurrently so should complete quicker.
|
||||||
|
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
|
||||||
|
// Each rule produces one vector.
|
||||||
|
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) {
|
||||||
|
// Reset.
|
||||||
|
inflightQueries.Store(0)
|
||||||
|
maxInflight.Store(0)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
ruleCount := 6
|
||||||
|
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
||||||
|
|
||||||
|
// Configure concurrency settings.
|
||||||
|
opts.ConcurrentEvalsEnabled = true
|
||||||
|
opts.MaxConcurrentEvals = 2
|
||||||
|
opts.RuleConcurrencyController = nil
|
||||||
|
ruleManager := NewManager(opts)
|
||||||
|
|
||||||
|
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...)
|
||||||
|
require.Empty(t, errs)
|
||||||
|
require.Len(t, groups, 1)
|
||||||
|
|
||||||
|
for _, group := range groups {
|
||||||
|
require.Len(t, group.rules, ruleCount)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
group.Eval(ctx, start)
|
||||||
|
|
||||||
|
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
|
||||||
|
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
|
||||||
|
// Some rules should execute concurrently so should complete quicker.
|
||||||
|
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
|
||||||
|
// Each rule produces one vector.
|
||||||
|
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||||
|
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) {
|
||||||
|
// Reset.
|
||||||
|
inflightQueries.Store(0)
|
||||||
|
maxInflight.Store(0)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
ruleCount := 6
|
||||||
|
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
||||||
|
|
||||||
|
// Configure concurrency settings.
|
||||||
|
opts.ConcurrentEvalsEnabled = true
|
||||||
|
opts.MaxConcurrentEvals = int64(ruleCount) * 2
|
||||||
|
opts.RuleConcurrencyController = nil
|
||||||
|
ruleManager := NewManager(opts)
|
||||||
|
|
||||||
|
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...)
|
||||||
|
require.Empty(t, errs)
|
||||||
|
require.Len(t, groups, 1)
|
||||||
|
|
||||||
|
for _, group := range groups {
|
||||||
|
require.Len(t, group.rules, ruleCount)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
group.Eval(ctx, start)
|
||||||
|
|
||||||
|
// Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once.
|
||||||
|
require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals)
|
||||||
|
// Some rules should execute concurrently so should complete quicker.
|
||||||
|
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
|
||||||
|
// Each rule produces one vector.
|
||||||
|
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBoundedRuleEvalConcurrency(t *testing.T) {
|
||||||
|
storage := teststorage.New(t)
|
||||||
|
t.Cleanup(func() { storage.Close() })
|
||||||
|
|
||||||
|
var (
|
||||||
|
inflightQueries atomic.Int32
|
||||||
|
maxInflight atomic.Int32
|
||||||
|
maxConcurrency int64 = 3
|
||||||
|
groupCount = 2
|
||||||
|
)
|
||||||
|
|
||||||
|
files := []string{"fixtures/rules_multiple_groups.yaml"}
|
||||||
|
|
||||||
|
ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, maxConcurrency))
|
||||||
|
|
||||||
|
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
|
||||||
|
require.Empty(t, errs)
|
||||||
|
require.Len(t, groups, groupCount)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
// Evaluate groups concurrently (like they normally do).
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for _, group := range groups {
|
||||||
|
group := group
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
group.Eval(ctx, time.Now())
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Synchronous queries also count towards inflight, so at most we can have maxConcurrency+$groupCount inflight evaluations.
|
||||||
|
require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount))
|
||||||
|
}
|
||||||
|
|
||||||
|
const artificialDelay = 10 * time.Millisecond
|
||||||
|
|
||||||
|
func optsFactory(storage storage.Storage, maxInflight, inflightQueries *atomic.Int32, maxConcurrent int64) *ManagerOptions {
|
||||||
|
var inflightMu sync.Mutex
|
||||||
|
|
||||||
|
concurrent := maxConcurrent > 0
|
||||||
|
|
||||||
|
return &ManagerOptions{
|
||||||
|
Context: context.Background(),
|
||||||
|
Logger: log.NewNopLogger(),
|
||||||
|
ConcurrentEvalsEnabled: concurrent,
|
||||||
|
MaxConcurrentEvals: maxConcurrent,
|
||||||
|
Appendable: storage,
|
||||||
|
QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
|
||||||
|
inflightMu.Lock()
|
||||||
|
|
||||||
|
current := inflightQueries.Add(1)
|
||||||
|
defer func() {
|
||||||
|
inflightQueries.Add(-1)
|
||||||
|
}()
|
||||||
|
|
||||||
|
highWatermark := maxInflight.Load()
|
||||||
|
|
||||||
|
if current > highWatermark {
|
||||||
|
maxInflight.Store(current)
|
||||||
|
}
|
||||||
|
inflightMu.Unlock()
|
||||||
|
|
||||||
|
// Artificially delay all query executions to highlight concurrent execution improvement.
|
||||||
|
time.Sleep(artificialDelay)
|
||||||
|
|
||||||
|
// Return a stub sample.
|
||||||
|
return promql.Vector{
|
||||||
|
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue