Generalized rule group iteration evaluation hook (#11885)

Signed-off-by: Soon-Ping Phang <soonping@amazon.com>
This commit is contained in:
Soon-Ping 2023-04-04 11:21:13 -07:00 committed by GitHub
parent 044b3ecd14
commit 6cecb87941
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 126 additions and 81 deletions

View file

@ -253,7 +253,8 @@ type Group struct {
opts *ManagerOptions opts *ManagerOptions
mtx sync.Mutex mtx sync.Mutex
evaluationTime time.Duration evaluationTime time.Duration
lastEvaluation time.Time lastEvaluation time.Time // Wall-clock time of most recent evaluation.
lastEvalTimestamp time.Time // Time slot used for most recent evaluation.
shouldRestore bool shouldRestore bool
@ -266,22 +267,27 @@ type Group struct {
metrics *Metrics metrics *Metrics
ruleGroupPostProcessFunc RuleGroupPostProcessFunc // Rule group evaluation iteration function,
// defaults to DefaultEvalIterationFunc.
evalIterationFunc GroupEvalIterationFunc
} }
// This function will be used before each rule group evaluation if not nil. // GroupEvalIterationFunc is used to implement and extend rule group
// Use this function type if the rule group post processing is needed. // evaluation iteration logic. It is configured in Group.evalIterationFunc,
type RuleGroupPostProcessFunc func(g *Group, lastEvalTimestamp time.Time, log log.Logger) error // and periodically invoked at each group evaluation interval to
// evaluate the rules in the group at that point in time.
// DefaultEvalIterationFunc is the default implementation.
type GroupEvalIterationFunc func(ctx context.Context, g *Group, evalTimestamp time.Time)
type GroupOptions struct { type GroupOptions struct {
Name, File string Name, File string
Interval time.Duration Interval time.Duration
Limit int Limit int
Rules []Rule Rules []Rule
ShouldRestore bool ShouldRestore bool
Opts *ManagerOptions Opts *ManagerOptions
done chan struct{} done chan struct{}
RuleGroupPostProcessFunc RuleGroupPostProcessFunc EvalIterationFunc GroupEvalIterationFunc
} }
// NewGroup makes a new Group with the given name, options, and rules. // NewGroup makes a new Group with the given name, options, and rules.
@ -302,21 +308,26 @@ func NewGroup(o GroupOptions) *Group {
metrics.GroupSamples.WithLabelValues(key) metrics.GroupSamples.WithLabelValues(key)
metrics.GroupInterval.WithLabelValues(key).Set(o.Interval.Seconds()) metrics.GroupInterval.WithLabelValues(key).Set(o.Interval.Seconds())
evalIterationFunc := o.EvalIterationFunc
if evalIterationFunc == nil {
evalIterationFunc = DefaultEvalIterationFunc
}
return &Group{ return &Group{
name: o.Name, name: o.Name,
file: o.File, file: o.File,
interval: o.Interval, interval: o.Interval,
limit: o.Limit, limit: o.Limit,
rules: o.Rules, rules: o.Rules,
shouldRestore: o.ShouldRestore, shouldRestore: o.ShouldRestore,
opts: o.Opts, opts: o.Opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}), done: make(chan struct{}),
managerDone: o.done, managerDone: o.done,
terminated: make(chan struct{}), terminated: make(chan struct{}),
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
metrics: metrics, metrics: metrics,
ruleGroupPostProcessFunc: o.RuleGroupPostProcessFunc, evalIterationFunc: evalIterationFunc,
} }
} }
@ -341,6 +352,8 @@ func (g *Group) Interval() time.Duration { return g.interval }
// Limit returns the group's limit. // Limit returns the group's limit.
func (g *Group) Limit() int { return g.limit } func (g *Group) Limit() int { return g.limit }
func (g *Group) Logger() log.Logger { return g.logger }
func (g *Group) run(ctx context.Context) { func (g *Group) run(ctx context.Context) {
defer close(g.terminated) defer close(g.terminated)
@ -359,18 +372,6 @@ func (g *Group) run(ctx context.Context) {
}, },
}) })
iter := func() {
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having // The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon // waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.interval` occurrence. // after each `evalTimestamp + N * g.interval` occurrence.
@ -400,7 +401,7 @@ func (g *Group) run(ctx context.Context) {
}(time.Now()) }(time.Now())
}() }()
iter() g.evalIterationFunc(ctx, g, evalTimestamp)
if g.shouldRestore { if g.shouldRestore {
// If we have to restore, we wait for another Eval to finish. // If we have to restore, we wait for another Eval to finish.
// The reason behind this is, during first eval (or before it) // The reason behind this is, during first eval (or before it)
@ -416,7 +417,7 @@ func (g *Group) run(ctx context.Context) {
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
} }
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter() g.evalIterationFunc(ctx, g, evalTimestamp)
} }
g.RestoreForState(time.Now()) g.RestoreForState(time.Now())
@ -439,21 +440,29 @@ func (g *Group) run(ctx context.Context) {
} }
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
useRuleGroupPostProcessFunc(g, evalTimestamp.Add(-(missed+1)*g.interval)) g.evalIterationFunc(ctx, g, evalTimestamp)
iter()
} }
} }
} }
} }
func useRuleGroupPostProcessFunc(g *Group, lastEvalTimestamp time.Time) { // DefaultEvalIterationFunc is the default implementation of
if g.ruleGroupPostProcessFunc != nil { // GroupEvalIterationFunc that is periodically invoked to evaluate the rules
err := g.ruleGroupPostProcessFunc(g, lastEvalTimestamp, g.logger) // in a group at a given point in time and updates Group state and metrics
if err != nil { // accordingly. Custom GroupEvalIterationFunc implementations are recommended
level.Warn(g.logger).Log("msg", "ruleGroupPostProcessFunc failed", "err", err) // to invoke this function as well, to ensure correct Group state and metrics
} // are maintained.
} func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.Time) {
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
g.setLastEvalTimestamp(evalTimestamp)
} }
func (g *Group) stop() { func (g *Group) stop() {
@ -533,6 +542,20 @@ func (g *Group) setLastEvaluation(ts time.Time) {
g.lastEvaluation = ts g.lastEvaluation = ts
} }
// GetLastEvalTimestamp returns the timestamp of the last evaluation.
func (g *Group) GetLastEvalTimestamp() time.Time {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.lastEvalTimestamp
}
// setLastEvalTimestamp updates lastEvalTimestamp to the timestamp of the last evaluation.
func (g *Group) setLastEvalTimestamp(ts time.Time) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.lastEvalTimestamp = ts
}
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time. // EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *Group) EvalTimestamp(startTime int64) time.Time { func (g *Group) EvalTimestamp(startTime int64) time.Time {
var ( var (
@ -996,11 +1019,11 @@ func (m *Manager) Stop() {
// Update the rule manager's state as the config requires. If // Update the rule manager's state as the config requires. If
// loading the new rules failed the old rule set is restored. // loading the new rules failed the old rule set is restored.
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, ruleGroupPostProcessFunc RuleGroupPostProcessFunc) error { func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc) error {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, ruleGroupPostProcessFunc, files...) groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...)
if errs != nil { if errs != nil {
for _, e := range errs { for _, e := range errs {
@ -1085,7 +1108,7 @@ func (FileLoader) Parse(query string) (parser.Expr, error) { return parser.Parse
// LoadGroups reads groups from a list of files. // LoadGroups reads groups from a list of files.
func (m *Manager) LoadGroups( func (m *Manager) LoadGroups(
interval time.Duration, externalLabels labels.Labels, externalURL string, ruleGroupPostProcessFunc RuleGroupPostProcessFunc, filenames ...string, interval time.Duration, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc, filenames ...string,
) (map[string]*Group, []error) { ) (map[string]*Group, []error) {
groups := make(map[string]*Group) groups := make(map[string]*Group)
@ -1133,15 +1156,15 @@ func (m *Manager) LoadGroups(
} }
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name, Name: rg.Name,
File: fn, File: fn,
Interval: itv, Interval: itv,
Limit: rg.Limit, Limit: rg.Limit,
Rules: rules, Rules: rules,
ShouldRestore: shouldRestore, ShouldRestore: shouldRestore,
Opts: m.opts, Opts: m.opts,
done: m.done, done: m.done,
RuleGroupPostProcessFunc: ruleGroupPostProcessFunc, EvalIterationFunc: groupEvalIterationFunc,
}) })
} }
} }

View file

@ -1237,7 +1237,7 @@ func TestRuleHealthUpdates(t *testing.T) {
require.Equal(t, HealthBad, rules.Health()) require.Equal(t, HealthBad, rules.Health())
} }
func TestUpdateMissedEvalMetrics(t *testing.T) { func TestRuleGroupEvalIterationFunc(t *testing.T) {
suite, err := promql.NewTest(t, ` suite, err := promql.NewTest(t, `
load 5m load 5m
http_requests{instance="0"} 75 85 50 0 0 25 0 0 40 0 120 http_requests{instance="0"} 75 85 50 0 0 25 0 0 40 0 120
@ -1254,26 +1254,40 @@ func TestUpdateMissedEvalMetrics(t *testing.T) {
testValue := 1 testValue := 1
overrideFunc := func(g *Group, lastEvalTimestamp time.Time, log log.Logger) error { evalIterationFunc := func(ctx context.Context, g *Group, evalTimestamp time.Time) {
testValue = 2 testValue = 2
return nil DefaultEvalIterationFunc(ctx, g, evalTimestamp)
testValue = 3
}
skipEvalIterationFunc := func(ctx context.Context, g *Group, evalTimestamp time.Time) {
testValue = 4
} }
type testInput struct { type testInput struct {
overrideFunc func(g *Group, lastEvalTimestamp time.Time, logger log.Logger) error evalIterationFunc GroupEvalIterationFunc
expectedValue int expectedValue int
lastEvalTimestampIsZero bool
} }
tests := []testInput{ tests := []testInput{
// testValue should still have value of 1 since overrideFunc is nil. // testValue should still have value of 1 since the default iteration function will be called.
{ {
overrideFunc: nil, evalIterationFunc: nil,
expectedValue: 1, expectedValue: 1,
lastEvalTimestampIsZero: false,
}, },
// testValue should be incremented to 2 since overrideFunc is called. // testValue should be incremented to 3 since evalIterationFunc is called.
{ {
overrideFunc: overrideFunc, evalIterationFunc: evalIterationFunc,
expectedValue: 2, expectedValue: 3,
lastEvalTimestampIsZero: false,
},
// testValue should be incremented to 4 since skipEvalIterationFunc is called.
{
evalIterationFunc: skipEvalIterationFunc,
expectedValue: 4,
lastEvalTimestampIsZero: true,
}, },
} }
@ -1315,12 +1329,12 @@ func TestUpdateMissedEvalMetrics(t *testing.T) {
} }
group := NewGroup(GroupOptions{ group := NewGroup(GroupOptions{
Name: "default", Name: "default",
Interval: time.Second, Interval: time.Second,
Rules: []Rule{rule}, Rules: []Rule{rule},
ShouldRestore: true, ShouldRestore: true,
Opts: opts, Opts: opts,
RuleGroupPostProcessFunc: tst.overrideFunc, EvalIterationFunc: tst.evalIterationFunc,
}) })
go func() { go func() {
@ -1329,10 +1343,18 @@ func TestUpdateMissedEvalMetrics(t *testing.T) {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
group.stop() group.stop()
require.Equal(t, tst.expectedValue, testValue) require.Equal(t, tst.expectedValue, testValue)
if tst.lastEvalTimestampIsZero {
require.Zero(t, group.GetLastEvalTimestamp())
} else {
oneMinute, _ := time.ParseDuration("1m")
require.WithinDuration(t, time.Now(), group.GetLastEvalTimestamp(), oneMinute)
}
} }
for _, tst := range tests { for i, tst := range tests {
t.Logf("case %d", i)
testFunc(tst) testFunc(tst)
} }
} }