prometheus/rules/manager.go

582 lines
17 KiB
Go
Raw Normal View History

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rules
import (
"context"
"errors"
"fmt"
"log/slog"
"net/url"
"slices"
2023-09-21 13:53:51 -07:00
"strings"
2013-04-17 05:42:15 -07:00
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/promslog"
"golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/strutil"
)
// QueryFunc processes PromQL queries.
type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, error)
// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
// It converts scalar into vector results.
func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable) QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch v := res.Value.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
promql: Separate `Point` into `FPoint` and `HPoint` In other words: Instead of having a “polymorphous” `Point` that can either contain a float value or a histogram value, use an `FPoint` for floats and an `HPoint` for histograms. This seemingly small change has a _lot_ of repercussions throughout the codebase. The idea here is to avoid the increase in size of `Point` arrays that happened after native histograms had been added. The higher-level data structures (`Sample`, `Series`, etc.) are still “polymorphous”. The same idea could be applied to them, but at each step the trade-offs needed to be evaluated. The idea with this change is to do the minimum necessary to get back to pre-histogram performance for functions that do not touch histograms. Here are comparisons for the `changes` function. The test data doesn't include histograms yet. Ideally, there would be no change in the benchmark result at all. First runtime v2.39 compared to directly prior to this commit: ``` name old time/op new time/op delta RangeQuery/expr=changes(a_one[1d]),steps=1-16 391µs ± 2% 542µs ± 1% +38.58% (p=0.000 n=9+8) RangeQuery/expr=changes(a_one[1d]),steps=10-16 452µs ± 2% 617µs ± 2% +36.48% (p=0.000 n=10+10) RangeQuery/expr=changes(a_one[1d]),steps=100-16 1.12ms ± 1% 1.36ms ± 2% +21.58% (p=0.000 n=8+10) RangeQuery/expr=changes(a_one[1d]),steps=1000-16 7.83ms ± 1% 8.94ms ± 1% +14.21% (p=0.000 n=10+10) RangeQuery/expr=changes(a_ten[1d]),steps=1-16 2.98ms ± 0% 3.30ms ± 1% +10.67% (p=0.000 n=9+10) RangeQuery/expr=changes(a_ten[1d]),steps=10-16 3.66ms ± 1% 4.10ms ± 1% +11.82% (p=0.000 n=10+10) RangeQuery/expr=changes(a_ten[1d]),steps=100-16 10.5ms ± 0% 11.8ms ± 1% +12.50% (p=0.000 n=8+10) RangeQuery/expr=changes(a_ten[1d]),steps=1000-16 77.6ms ± 1% 87.4ms ± 1% +12.63% (p=0.000 n=9+9) RangeQuery/expr=changes(a_hundred[1d]),steps=1-16 30.4ms ± 2% 32.8ms ± 1% +8.01% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=10-16 37.1ms ± 2% 40.6ms ± 2% +9.64% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=100-16 105ms ± 1% 117ms ± 1% +11.69% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=1000-16 783ms ± 3% 876ms ± 1% +11.83% (p=0.000 n=9+10) ``` And then runtime v2.39 compared to after this commit: ``` name old time/op new time/op delta RangeQuery/expr=changes(a_one[1d]),steps=1-16 391µs ± 2% 547µs ± 1% +39.84% (p=0.000 n=9+8) RangeQuery/expr=changes(a_one[1d]),steps=10-16 452µs ± 2% 616µs ± 2% +36.15% (p=0.000 n=10+10) RangeQuery/expr=changes(a_one[1d]),steps=100-16 1.12ms ± 1% 1.26ms ± 1% +12.20% (p=0.000 n=8+10) RangeQuery/expr=changes(a_one[1d]),steps=1000-16 7.83ms ± 1% 7.95ms ± 1% +1.59% (p=0.000 n=10+8) RangeQuery/expr=changes(a_ten[1d]),steps=1-16 2.98ms ± 0% 3.38ms ± 2% +13.49% (p=0.000 n=9+10) RangeQuery/expr=changes(a_ten[1d]),steps=10-16 3.66ms ± 1% 4.02ms ± 1% +9.80% (p=0.000 n=10+9) RangeQuery/expr=changes(a_ten[1d]),steps=100-16 10.5ms ± 0% 10.8ms ± 1% +3.08% (p=0.000 n=8+10) RangeQuery/expr=changes(a_ten[1d]),steps=1000-16 77.6ms ± 1% 78.1ms ± 1% +0.58% (p=0.035 n=9+10) RangeQuery/expr=changes(a_hundred[1d]),steps=1-16 30.4ms ± 2% 33.5ms ± 4% +10.18% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=10-16 37.1ms ± 2% 40.0ms ± 1% +7.98% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=100-16 105ms ± 1% 107ms ± 1% +1.92% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=1000-16 783ms ± 3% 775ms ± 1% -1.02% (p=0.019 n=9+9) ``` In summary, the runtime doesn't really improve with this change for queries with just a few steps. For queries with many steps, this commit essentially reinstates the old performance. This is good because the many-step queries are the one that matter most (longest absolute runtime). In terms of allocations, though, this commit doesn't make a dent at all (numbers not shown). The reason is that most of the allocations happen in the sampleRingIterator (in the storage package), which has to be addressed in a separate commit. Signed-off-by: beorn7 <beorn@grafana.com>
2022-10-28 07:58:40 -07:00
T: v.T,
F: v.V,
Metric: labels.Labels{},
}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
}
}
}
// DefaultEvalIterationFunc is the default implementation of
// GroupEvalIterationFunc that is periodically invoked to evaluate the rules
// in a group at a given point in time and updates Group state and metrics
// accordingly. Custom GroupEvalIterationFunc implementations are recommended
// to invoke this function as well, to ensure correct Group state and metrics
// are maintained.
func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.Time) {
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
g.updateRuleEvaluationTimeSum()
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
g.setLastEvalTimestamp(evalTimestamp)
Add SyncForState Implementation for Ruler HA (#10070) * continuously syncing activeAt for alerts Signed-off-by: Yijie Qin <qinyijie@amazon.com> Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * add import Signed-off-by: Yijie Qin <qinyijie@amazon.com> Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Refactor SyncForState and add unit tests Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Format code Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Add hook for syncForState Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix go lint Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Refactor syncForState override implementation Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Add syncForState override func as argument to Update() Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix go formatting Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix circleci test errors Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Remove overrideFunc as argument to run() Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * remove the syncForState Signed-off-by: Yijie Qin <qinyijie@amazon.com> * use the override function to decide if need to replace the activeAt or not Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix test case Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix format Signed-off-by: Yijie Qin <qinyijie@amazon.com> * Trigger build Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fixing comments Signed-off-by: Yijie Qin <qinyijie@amazon.com> * return the result of map of alerts instead of single one Signed-off-by: Yijie Qin <qinyijie@amazon.com> * upper case the QueryforStateSeries Signed-off-by: Yijie Qin <qinyijie@amazon.com> * use a more generic rule group post process function type Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix indentation Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix gofmt Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix lint Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fixing naming Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix comments Signed-off-by: Yijie Qin <qinyijie@amazon.com> * add the lastEvalTimestamp as parameter Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fmt Signed-off-by: Yijie Qin <qinyijie@amazon.com> * change funcType to func Signed-off-by: Yijie Qin <qinyijie@amazon.com> Co-authored-by: Yijie Qin <qinyijie@amazon.com> Co-authored-by: Yijie Qin <63399121+qinxx108@users.noreply.github.com>
2022-03-28 17:16:46 -07:00
}
2015-12-14 08:40:40 -08:00
// The Manager manages recording and alerting rules.
type Manager struct {
opts *ManagerOptions
groups map[string]*Group
mtx sync.RWMutex
block chan struct{}
done chan struct{}
restored bool
restoreNewRuleGroups bool
logger *slog.Logger
2015-12-14 08:40:40 -08:00
}
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
2015-12-14 08:40:40 -08:00
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
ExternalURL *url.URL
QueryFunc QueryFunc
NotifyFunc NotifyFunc
Context context.Context
Appendable storage.Appendable
Queryable storage.Queryable
Logger *slog.Logger
Registerer prometheus.Registerer
OutageTolerance time.Duration
ForGracePeriod time.Duration
ResendDelay time.Duration
GroupLoader GroupLoader
Feature: Allow configuration of a rule evaluation delay (#14061) * [PATCH] Allow having evaluation delay for rule groups Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * [PATCH] Fix lint Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * [PATCH] Move the option to ManagerOptions Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * [PATCH] Include evaluation_delay in the group config Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix comments Signed-off-by: gotjosh <josue.abreu@gmail.com> * Add a server configuration option. Signed-off-by: gotjosh <josue.abreu@gmail.com> * Appease the linter #1 Signed-off-by: gotjosh <josue.abreu@gmail.com> * Add the new server flag documentation Signed-off-by: gotjosh <josue.abreu@gmail.com> * Improve documentation of the new flag and configuration Signed-off-by: gotjosh <josue.abreu@gmail.com> * Use named parameters for clarity on the `Rule` interface Signed-off-by: gotjosh <josue.abreu@gmail.com> * Add `initial` to the flag help Signed-off-by: gotjosh <josue.abreu@gmail.com> * Change the CHANGELOG area from `ruler` to `rules` Signed-off-by: gotjosh <josue.abreu@gmail.com> * Rename evaluation_delay to `rule_query_offset`/`query_offset` and make it a global configuration option. Signed-off-by: gotjosh <josue.abreu@gmail.com> E Your branch is up to date with 'origin/gotjosh/evaluation-delay'. * more docs Signed-off-by: gotjosh <josue.abreu@gmail.com> * Improve wording on CHANGELOG Signed-off-by: gotjosh <josue.abreu@gmail.com> * Add `RuleQueryOffset` to the default config in tests in case it changes Signed-off-by: gotjosh <josue.abreu@gmail.com> * Update docs/configuration/recording_rules.md Co-authored-by: Julius Volz <julius.volz@gmail.com> Signed-off-by: gotjosh <josue.abreu@gmail.com> * Rename `RuleQueryOffset` to `QueryOffset` when in the group context. Signed-off-by: gotjosh <josue.abreu@gmail.com> * Improve docstring and documentation on the `rule_query_offset` Signed-off-by: gotjosh <josue.abreu@gmail.com> --------- Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: gotjosh <josue.abreu@gmail.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Julius Volz <julius.volz@gmail.com>
2024-05-30 03:49:50 -07:00
DefaultRuleQueryOffset func() time.Duration
MaxConcurrentEvals int64
ConcurrentEvalsEnabled bool
RuleConcurrencyController RuleConcurrencyController
RuleDependencyController RuleDependencyController
// At present, manager only restores `for` state when manager is newly created which happens
// during restarts. This flag provides an option to restore the `for` state when new rule groups are
// added to an existing manager
RestoreNewRuleGroups bool
Metrics *Metrics
2015-12-14 08:40:40 -08:00
}
2015-12-14 08:40:40 -08:00
// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager(o *ManagerOptions) *Manager {
if o.Metrics == nil {
o.Metrics = NewGroupMetrics(o.Registerer)
}
if o.GroupLoader == nil {
o.GroupLoader = FileLoader{}
}
if o.RuleConcurrencyController == nil {
if o.ConcurrentEvalsEnabled {
o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals)
} else {
o.RuleConcurrencyController = sequentialRuleEvalController{}
}
}
if o.RuleDependencyController == nil {
o.RuleDependencyController = ruleDependencyController{}
}
if o.Logger == nil {
o.Logger = promslog.NewNopLogger()
}
m := &Manager{
groups: map[string]*Group{},
opts: o,
block: make(chan struct{}),
done: make(chan struct{}),
logger: o.Logger,
restoreNewRuleGroups: o.RestoreNewRuleGroups,
}
return m
}
// Run starts processing of the rule manager. It is blocking.
func (m *Manager) Run() {
m.logger.Info("Starting rule manager...")
m.start()
<-m.done
}
func (m *Manager) start() {
close(m.block)
}
2015-12-14 08:40:40 -08:00
// Stop the rule manager's rule evaluation cycles.
func (m *Manager) Stop() {
rules/manager.go: Fix race between reload and stop On one relatively large Prometheus instance (1.7M series), I noticed that upgrades were frequently resulting in Prometheus undergoing crash recovery on start-up. On closer examination, I found that Prometheus was panicking on shutdown. It seems that our configuration management (or misconfiguration thereof) is reloading Prometheus then immediately restarting it, which I suspect is causing this race: Sep 21 15:12:42 host systemd[1]: Reloading prometheus monitoring system. Sep 21 15:12:42 host prometheus[18734]: time="2016-09-21T15:12:42Z" level=info msg="Loading configuration file /etc/prometheus/config.yaml" source="main.go:221" Sep 21 15:12:42 host systemd[1]: Reloaded prometheus monitoring system. Sep 21 15:12:44 host systemd[1]: Stopping prometheus monitoring system... Sep 21 15:12:44 host prometheus[18734]: time="2016-09-21T15:12:44Z" level=warning msg="Received SIGTERM, exiting gracefully..." source="main.go:203" Sep 21 15:12:44 host prometheus[18734]: time="2016-09-21T15:12:44Z" level=info msg="See you next time!" source="main.go:210" Sep 21 15:12:44 host prometheus[18734]: time="2016-09-21T15:12:44Z" level=info msg="Stopping target manager..." source="targetmanager.go:90" Sep 21 15:12:52 host prometheus[18734]: time="2016-09-21T15:12:52Z" level=info msg="Checkpointing in-memory metrics and chunks..." source="persistence.go:548" Sep 21 15:12:56 host prometheus[18734]: time="2016-09-21T15:12:56Z" level=warning msg="Error on ingesting out-of-order samples" numDropped=1 source="scrape.go:467" Sep 21 15:12:56 host prometheus[18734]: time="2016-09-21T15:12:56Z" level=error msg="Error adding file watch for \"/etc/prometheus/targets\": no such file or directory" source="file.go:84" Sep 21 15:12:56 host prometheus[18734]: time="2016-09-21T15:12:56Z" level=error msg="Error adding file watch for \"/etc/prometheus/targets\": no such file or directory" source="file.go:84" Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping rule manager..." source="manager.go:366" Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Rule manager stopped." source="manager.go:372" Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping notification handler..." source="notifier.go:325" Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping local storage..." source="storage.go:381" Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping maintenance loop..." source="storage.go:383" Sep 21 15:13:01 host prometheus[18734]: panic: close of closed channel Sep 21 15:13:01 host prometheus[18734]: goroutine 7686074 [running]: Sep 21 15:13:01 host prometheus[18734]: panic(0xba57a0, 0xc60c42b500) Sep 21 15:13:01 host prometheus[18734]: /usr/local/go/src/runtime/panic.go:500 +0x1a1 Sep 21 15:13:01 host prometheus[18734]: github.com/prometheus/prometheus/rules.(*Manager).ApplyConfig.func1(0xc6645a9901, 0xc420271ef0, 0xc420338ed0, 0xc60c42b4f0, 0xc6645a9900) Sep 21 15:13:01 host prometheus[18734]: /home/build/packages/prometheus/tmp/build/gopath/src/github.com/prometheus/prometheus/rules/manager.go:412 +0x3c Sep 21 15:13:01 host prometheus[18734]: created by github.com/prometheus/prometheus/rules.(*Manager).ApplyConfig Sep 21 15:13:01 host prometheus[18734]: /home/build/packages/prometheus/tmp/build/gopath/src/github.com/prometheus/prometheus/rules/manager.go:423 +0x56b Sep 21 15:13:03 host systemd[1]: prometheus.service: main process exited, code=exited, status=2/INVALIDARGUMENT
2016-09-21 14:03:02 -07:00
m.mtx.Lock()
defer m.mtx.Unlock()
m.logger.Info("Stopping rule manager...")
2015-06-30 02:51:05 -07:00
// Stop all groups asynchronously, then wait for them to finish.
// This is faster than stopping and waiting for each group in sequence.
2015-12-14 08:40:40 -08:00
for _, eg := range m.groups {
eg.stopAsync()
}
for _, eg := range m.groups {
eg.waitStopped()
2015-06-30 02:51:05 -07:00
}
// Shut down the groups waiting multiple evaluation intervals to write
// staleness markers.
close(m.done)
m.logger.Info("Rule manager stopped")
2015-06-30 02:51:05 -07:00
}
// Update the rule manager's state as the config requires. If
// loading the new rules failed the old rule set is restored.
// This method will no-op in case the manager is already stopped.
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc) error {
2015-12-14 08:40:40 -08:00
m.mtx.Lock()
defer m.mtx.Unlock()
// We cannot update a stopped manager
select {
case <-m.done:
return nil
default:
}
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, false, files...)
Add SyncForState Implementation for Ruler HA (#10070) * continuously syncing activeAt for alerts Signed-off-by: Yijie Qin <qinyijie@amazon.com> Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * add import Signed-off-by: Yijie Qin <qinyijie@amazon.com> Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Refactor SyncForState and add unit tests Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Format code Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Add hook for syncForState Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix go lint Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Refactor syncForState override implementation Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Add syncForState override func as argument to Update() Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix go formatting Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix circleci test errors Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Remove overrideFunc as argument to run() Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * remove the syncForState Signed-off-by: Yijie Qin <qinyijie@amazon.com> * use the override function to decide if need to replace the activeAt or not Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix test case Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix format Signed-off-by: Yijie Qin <qinyijie@amazon.com> * Trigger build Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fixing comments Signed-off-by: Yijie Qin <qinyijie@amazon.com> * return the result of map of alerts instead of single one Signed-off-by: Yijie Qin <qinyijie@amazon.com> * upper case the QueryforStateSeries Signed-off-by: Yijie Qin <qinyijie@amazon.com> * use a more generic rule group post process function type Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix indentation Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix gofmt Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix lint Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fixing naming Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix comments Signed-off-by: Yijie Qin <qinyijie@amazon.com> * add the lastEvalTimestamp as parameter Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fmt Signed-off-by: Yijie Qin <qinyijie@amazon.com> * change funcType to func Signed-off-by: Yijie Qin <qinyijie@amazon.com> Co-authored-by: Yijie Qin <qinyijie@amazon.com> Co-authored-by: Yijie Qin <63399121+qinxx108@users.noreply.github.com>
2022-03-28 17:16:46 -07:00
if errs != nil {
for _, e := range errs {
m.logger.Error("loading groups failed", "err", e)
}
return errors.New("error loading rules, previous rule set restored")
}
m.restored = true
2015-12-14 08:40:40 -08:00
var wg sync.WaitGroup
for _, newg := range groups {
// If there is an old group with the same identifier,
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
gn := GroupKey(newg.file, newg.name)
oldg, ok := m.groups[gn]
delete(m.groups, gn)
2015-12-14 08:40:40 -08:00
if ok && oldg.Equals(newg) {
groups[gn] = oldg
continue
}
wg.Add(1)
go func(newg *Group) {
2015-12-14 08:40:40 -08:00
if ok {
oldg.stop()
newg.CopyState(oldg)
2015-12-14 08:40:40 -08:00
}
wg.Done()
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
newg.run(m.opts.Context)
}(newg)
2015-12-14 08:40:40 -08:00
}
// Stop remaining old groups.
wg.Add(len(m.groups))
for n, oldg := range m.groups {
go func(n string, g *Group) {
g.markStale = true
g.stop()
if m := g.metrics; m != nil {
m.IterationsMissed.DeleteLabelValues(n)
m.IterationsScheduled.DeleteLabelValues(n)
m.EvalTotal.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n)
m.GroupInterval.DeleteLabelValues(n)
m.GroupLastEvalTime.DeleteLabelValues(n)
m.GroupLastDuration.DeleteLabelValues(n)
m.GroupRules.DeleteLabelValues(n)
m.GroupSamples.DeleteLabelValues((n))
}
wg.Done()
}(n, oldg)
2015-12-14 08:40:40 -08:00
}
wg.Wait()
m.groups = groups
return nil
}
// GroupLoader is responsible for loading rule groups from arbitrary sources and parsing them.
type GroupLoader interface {
Load(identifier string, ignoreUnknownFields bool) (*rulefmt.RuleGroups, []error)
Parse(query string) (parser.Expr, error)
}
// FileLoader is the default GroupLoader implementation. It defers to rulefmt.ParseFile
// and parser.ParseExpr.
type FileLoader struct{}
func (FileLoader) Load(identifier string, ignoreUnknownFields bool) (*rulefmt.RuleGroups, []error) {
return rulefmt.ParseFile(identifier, ignoreUnknownFields)
}
func (FileLoader) Parse(query string) (parser.Expr, error) { return parser.ParseExpr(query) }
// LoadGroups reads groups from a list of files.
func (m *Manager) LoadGroups(
interval time.Duration, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc, ignoreUnknownFields bool, filenames ...string,
) (map[string]*Group, []error) {
groups := make(map[string]*Group)
shouldRestore := !m.restored || m.restoreNewRuleGroups
for _, fn := range filenames {
rgs, errs := m.opts.GroupLoader.Load(fn, ignoreUnknownFields)
if errs != nil {
return nil, errs
}
for _, rg := range rgs.Groups {
itv := interval
if rg.Interval != 0 {
itv = time.Duration(rg.Interval)
}
2015-12-14 08:40:40 -08:00
rules := make([]Rule, 0, len(rg.Rules))
for _, r := range rg.Rules {
rulefmt: support YAML aliases for Alert/Record/Expr (#14957) * rulefmt: add tests with YAML aliases for Alert/Record/Expr Altough somewhat discouraged in favour of using proper configuration management tools to generate full YAML, it can still be useful in some situations to use YAML anchors/aliases in rules. The current implementation is however confusing: aliases will work everywhere except on the alert/record name and expr This first commit adds (failing) tests to illustrate the issue, the next one fixes it. The YAML test file is intentionally filled with anchors and aliases. Although this is probably not representative of a real-world use case (which would have less of them), it errs on the safer side. Signed-off-by: François HORTA <fhorta@scaleway.com> * rulefmt: support YAML aliases for Alert/Record/Expr This fixes the use of YAML aliases in alert/recording rule names and expressions. A side effect of this change is that the RuleNode YAML type is no longer propagated deeper in the codebase, instead the generic Rule type can now be used. Signed-off-by: François HORTA <fhorta@scaleway.com> * rulefmt: Add test for YAML merge combined with aliases Currently this does work, but adding a test for the related functionally here makes sense. Signed-off-by: David Leadbeater <dgl@dgl.cx> * rulefmt: Rebase to latest changes Signed-off-by: David Leadbeater <dgl@dgl.cx> --------- Signed-off-by: François HORTA <fhorta@scaleway.com> Signed-off-by: David Leadbeater <dgl@dgl.cx> Co-authored-by: David Leadbeater <dgl@dgl.cx>
2025-02-13 01:48:33 -08:00
expr, err := m.opts.GroupLoader.Parse(r.Expr)
if err != nil {
return nil, []error{fmt.Errorf("%s: %w", fn, err)}
}
2015-12-14 08:40:40 -08:00
mLabels := FromMaps(rg.Labels, r.Labels)
rulefmt: support YAML aliases for Alert/Record/Expr (#14957) * rulefmt: add tests with YAML aliases for Alert/Record/Expr Altough somewhat discouraged in favour of using proper configuration management tools to generate full YAML, it can still be useful in some situations to use YAML anchors/aliases in rules. The current implementation is however confusing: aliases will work everywhere except on the alert/record name and expr This first commit adds (failing) tests to illustrate the issue, the next one fixes it. The YAML test file is intentionally filled with anchors and aliases. Although this is probably not representative of a real-world use case (which would have less of them), it errs on the safer side. Signed-off-by: François HORTA <fhorta@scaleway.com> * rulefmt: support YAML aliases for Alert/Record/Expr This fixes the use of YAML aliases in alert/recording rule names and expressions. A side effect of this change is that the RuleNode YAML type is no longer propagated deeper in the codebase, instead the generic Rule type can now be used. Signed-off-by: François HORTA <fhorta@scaleway.com> * rulefmt: Add test for YAML merge combined with aliases Currently this does work, but adding a test for the related functionally here makes sense. Signed-off-by: David Leadbeater <dgl@dgl.cx> * rulefmt: Rebase to latest changes Signed-off-by: David Leadbeater <dgl@dgl.cx> --------- Signed-off-by: François HORTA <fhorta@scaleway.com> Signed-off-by: David Leadbeater <dgl@dgl.cx> Co-authored-by: David Leadbeater <dgl@dgl.cx>
2025-02-13 01:48:33 -08:00
if r.Alert != "" {
rules = append(rules, NewAlertingRule(
rulefmt: support YAML aliases for Alert/Record/Expr (#14957) * rulefmt: add tests with YAML aliases for Alert/Record/Expr Altough somewhat discouraged in favour of using proper configuration management tools to generate full YAML, it can still be useful in some situations to use YAML anchors/aliases in rules. The current implementation is however confusing: aliases will work everywhere except on the alert/record name and expr This first commit adds (failing) tests to illustrate the issue, the next one fixes it. The YAML test file is intentionally filled with anchors and aliases. Although this is probably not representative of a real-world use case (which would have less of them), it errs on the safer side. Signed-off-by: François HORTA <fhorta@scaleway.com> * rulefmt: support YAML aliases for Alert/Record/Expr This fixes the use of YAML aliases in alert/recording rule names and expressions. A side effect of this change is that the RuleNode YAML type is no longer propagated deeper in the codebase, instead the generic Rule type can now be used. Signed-off-by: François HORTA <fhorta@scaleway.com> * rulefmt: Add test for YAML merge combined with aliases Currently this does work, but adding a test for the related functionally here makes sense. Signed-off-by: David Leadbeater <dgl@dgl.cx> * rulefmt: Rebase to latest changes Signed-off-by: David Leadbeater <dgl@dgl.cx> --------- Signed-off-by: François HORTA <fhorta@scaleway.com> Signed-off-by: David Leadbeater <dgl@dgl.cx> Co-authored-by: David Leadbeater <dgl@dgl.cx>
2025-02-13 01:48:33 -08:00
r.Alert,
expr,
time.Duration(r.For),
time.Duration(r.KeepFiringFor),
mLabels,
labels.FromMap(r.Annotations),
externalLabels,
externalURL,
!shouldRestore,
m.logger.With("alert", r.Alert),
))
continue
}
rules = append(rules, NewRecordingRule(
rulefmt: support YAML aliases for Alert/Record/Expr (#14957) * rulefmt: add tests with YAML aliases for Alert/Record/Expr Altough somewhat discouraged in favour of using proper configuration management tools to generate full YAML, it can still be useful in some situations to use YAML anchors/aliases in rules. The current implementation is however confusing: aliases will work everywhere except on the alert/record name and expr This first commit adds (failing) tests to illustrate the issue, the next one fixes it. The YAML test file is intentionally filled with anchors and aliases. Although this is probably not representative of a real-world use case (which would have less of them), it errs on the safer side. Signed-off-by: François HORTA <fhorta@scaleway.com> * rulefmt: support YAML aliases for Alert/Record/Expr This fixes the use of YAML aliases in alert/recording rule names and expressions. A side effect of this change is that the RuleNode YAML type is no longer propagated deeper in the codebase, instead the generic Rule type can now be used. Signed-off-by: François HORTA <fhorta@scaleway.com> * rulefmt: Add test for YAML merge combined with aliases Currently this does work, but adding a test for the related functionally here makes sense. Signed-off-by: David Leadbeater <dgl@dgl.cx> * rulefmt: Rebase to latest changes Signed-off-by: David Leadbeater <dgl@dgl.cx> --------- Signed-off-by: François HORTA <fhorta@scaleway.com> Signed-off-by: David Leadbeater <dgl@dgl.cx> Co-authored-by: David Leadbeater <dgl@dgl.cx>
2025-02-13 01:48:33 -08:00
r.Record,
expr,
mLabels,
))
}
// Check dependencies between rules and store it on the Rule itself.
m.opts.RuleDependencyController.AnalyseRules(rules)
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
Interval: itv,
Limit: rg.Limit,
Rules: rules,
ShouldRestore: shouldRestore,
Opts: m.opts,
Feature: Allow configuration of a rule evaluation delay (#14061) * [PATCH] Allow having evaluation delay for rule groups Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * [PATCH] Fix lint Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * [PATCH] Move the option to ManagerOptions Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * [PATCH] Include evaluation_delay in the group config Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix comments Signed-off-by: gotjosh <josue.abreu@gmail.com> * Add a server configuration option. Signed-off-by: gotjosh <josue.abreu@gmail.com> * Appease the linter #1 Signed-off-by: gotjosh <josue.abreu@gmail.com> * Add the new server flag documentation Signed-off-by: gotjosh <josue.abreu@gmail.com> * Improve documentation of the new flag and configuration Signed-off-by: gotjosh <josue.abreu@gmail.com> * Use named parameters for clarity on the `Rule` interface Signed-off-by: gotjosh <josue.abreu@gmail.com> * Add `initial` to the flag help Signed-off-by: gotjosh <josue.abreu@gmail.com> * Change the CHANGELOG area from `ruler` to `rules` Signed-off-by: gotjosh <josue.abreu@gmail.com> * Rename evaluation_delay to `rule_query_offset`/`query_offset` and make it a global configuration option. Signed-off-by: gotjosh <josue.abreu@gmail.com> E Your branch is up to date with 'origin/gotjosh/evaluation-delay'. * more docs Signed-off-by: gotjosh <josue.abreu@gmail.com> * Improve wording on CHANGELOG Signed-off-by: gotjosh <josue.abreu@gmail.com> * Add `RuleQueryOffset` to the default config in tests in case it changes Signed-off-by: gotjosh <josue.abreu@gmail.com> * Update docs/configuration/recording_rules.md Co-authored-by: Julius Volz <julius.volz@gmail.com> Signed-off-by: gotjosh <josue.abreu@gmail.com> * Rename `RuleQueryOffset` to `QueryOffset` when in the group context. Signed-off-by: gotjosh <josue.abreu@gmail.com> * Improve docstring and documentation on the `rule_query_offset` Signed-off-by: gotjosh <josue.abreu@gmail.com> --------- Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: gotjosh <josue.abreu@gmail.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Julius Volz <julius.volz@gmail.com>
2024-05-30 03:49:50 -07:00
QueryOffset: (*time.Duration)(rg.QueryOffset),
done: m.done,
EvalIterationFunc: groupEvalIterationFunc,
})
}
}
2015-12-14 08:40:40 -08:00
return groups, nil
}
2013-06-11 02:00:55 -07:00
// RuleGroups returns the list of manager's rule groups.
func (m *Manager) RuleGroups() []*Group {
m.mtx.RLock()
defer m.mtx.RUnlock()
rgs := make([]*Group, 0, len(m.groups))
for _, g := range m.groups {
rgs = append(rgs, g)
}
2023-09-21 13:53:51 -07:00
slices.SortFunc(rgs, func(a, b *Group) int {
fileCompare := strings.Compare(a.file, b.file)
// If its 0, then the file names are the same.
// Lets look at the group names in that case.
if fileCompare != 0 {
return fileCompare
}
2023-09-21 13:53:51 -07:00
return strings.Compare(a.name, b.name)
})
return rgs
}
// Rules returns the list of the manager's rules.
func (m *Manager) Rules(matcherSets ...[]*labels.Matcher) []Rule {
2015-12-14 08:40:40 -08:00
m.mtx.RLock()
defer m.mtx.RUnlock()
var rules []Rule
for _, g := range m.groups {
rules = append(rules, g.Rules(matcherSets...)...)
2015-12-14 08:40:40 -08:00
}
2013-06-11 02:00:55 -07:00
return rules
}
// AlertingRules returns the list of the manager's alerting rules.
func (m *Manager) AlertingRules() []*AlertingRule {
alerts := []*AlertingRule{}
2015-12-14 08:40:40 -08:00
for _, rule := range m.Rules() {
if alertingRule, ok := rule.(*AlertingRule); ok {
alerts = append(alerts, alertingRule)
}
}
return alerts
}
type Sender interface {
Send(alerts ...*notifier.Alert)
}
// SendAlerts implements the rules.NotifyFunc for a Notifier.
func SendAlerts(s Sender, externalURL string) NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*Alert) {
var res []*notifier.Alert
for _, alert := range alerts {
a := &notifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}
if len(alerts) > 0 {
s.Send(res...)
}
}
}
// RuleDependencyController controls whether a set of rules have dependencies between each other.
type RuleDependencyController interface {
// AnalyseRules analyses dependencies between the input rules. For each rule that it's guaranteed
// not having any dependants and/or dependency, this function should call Rule.SetDependentRules(...)
// and/or Rule.SetDependencyRules(...).
AnalyseRules(rules []Rule)
}
type ruleDependencyController struct{}
// AnalyseRules implements RuleDependencyController.
func (c ruleDependencyController) AnalyseRules(rules []Rule) {
depMap := buildDependencyMap(rules)
if depMap == nil {
return
}
for _, r := range rules {
r.SetDependentRules(depMap.dependents(r))
r.SetDependencyRules(depMap.dependencies(r))
}
}
// ConcurrentRules represents a slice of indexes of rules that can be evaluated concurrently.
type ConcurrentRules []int
// RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently.
// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus
// server with additional query load. Concurrency is controlled globally, not on a per-group basis.
type RuleConcurrencyController interface {
// SplitGroupIntoBatches returns an ordered slice of of ConcurrentRules, which are batches of rules that can be evaluated concurrently.
// The rules are represented by their index from the input rule group.
SplitGroupIntoBatches(ctx context.Context, group *Group) []ConcurrentRules
// Allow determines if the given rule is allowed to be evaluated concurrently.
// If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done.
// It is important that both *Group and Rule are not retained and only be used for the duration of the call.
Allow(ctx context.Context, group *Group, rule Rule) bool
// Done releases a concurrent evaluation slot.
Done(ctx context.Context)
}
// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
type concurrentRuleEvalController struct {
sema *semaphore.Weighted
}
func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController {
return &concurrentRuleEvalController{
sema: semaphore.NewWeighted(maxConcurrency),
}
}
func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool {
return c.sema.TryAcquire(1)
}
func (c *concurrentRuleEvalController) SplitGroupIntoBatches(_ context.Context, g *Group) []ConcurrentRules {
// Using the rule dependency controller information (rules being identified as having no dependencies or no dependants),
// we can safely run the following concurrent groups:
// 1. Concurrently, all rules that have no dependencies
// 2. Sequentially, all rules that have both dependencies and dependants
// 3. Concurrently, all rules that have no dependants
var noDependencies []int
var dependenciesAndDependants []int
var noDependants []int
for i, r := range g.rules {
switch {
case r.NoDependencyRules():
noDependencies = append(noDependencies, i)
case !r.NoDependentRules() && !r.NoDependencyRules():
dependenciesAndDependants = append(dependenciesAndDependants, i)
case r.NoDependentRules():
noDependants = append(noDependants, i)
}
}
var order []ConcurrentRules
if len(noDependencies) > 0 {
order = append(order, noDependencies)
}
for _, r := range dependenciesAndDependants {
order = append(order, []int{r})
}
if len(noDependants) > 0 {
order = append(order, noDependants)
}
return order
}
func (c *concurrentRuleEvalController) Done(_ context.Context) {
c.sema.Release(1)
}
var _ RuleConcurrencyController = &sequentialRuleEvalController{}
// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially.
type sequentialRuleEvalController struct{}
func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) bool {
return false
}
func (c sequentialRuleEvalController) SplitGroupIntoBatches(_ context.Context, g *Group) []ConcurrentRules {
return nil
}
func (c sequentialRuleEvalController) Done(_ context.Context) {}
// FromMaps returns new sorted Labels from the given maps, overriding each other in order.
func FromMaps(maps ...map[string]string) labels.Labels {
mLables := make(map[string]string)
for _, m := range maps {
for k, v := range m {
mLables[k] = v
}
}
return labels.FromMap(mLables)
}