mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
[rule] Update rule health for append/commit fails (#8619)
* [rule] Update rule health for append/commit fails Similar to https://github.com/prometheus/prometheus/pull/8410 will provide more context. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Add test for updating health on append fails Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
This commit is contained in:
parent
195611e232
commit
4b5ab80ca6
|
@ -592,6 +592,9 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
|
|
||||||
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
|
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
rule.SetHealth(HealthBad)
|
||||||
|
rule.SetLastError(err)
|
||||||
|
|
||||||
// Canceled queries are intentional termination of queries. This normally
|
// Canceled queries are intentional termination of queries. This normally
|
||||||
// happens on shutdown and thus we skip logging of any errors here.
|
// happens on shutdown and thus we skip logging of any errors here.
|
||||||
if _, ok := err.(promql.ErrQueryCanceled); !ok {
|
if _, ok := err.(promql.ErrQueryCanceled); !ok {
|
||||||
|
@ -616,13 +619,20 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||||
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
|
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := app.Commit(); err != nil {
|
if err := app.Commit(); err != nil {
|
||||||
|
rule.SetHealth(HealthBad)
|
||||||
|
rule.SetLastError(err)
|
||||||
|
|
||||||
level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
|
level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
g.seriesInPreviousEval[i] = seriesReturned
|
g.seriesInPreviousEval[i] = seriesReturned
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for _, s := range vector {
|
for _, s := range vector {
|
||||||
if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil {
|
if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil {
|
||||||
|
rule.SetHealth(HealthBad)
|
||||||
|
rule.SetLastError(err)
|
||||||
|
|
||||||
switch errors.Cause(err) {
|
switch errors.Cause(err) {
|
||||||
case storage.ErrOutOfOrderSample:
|
case storage.ErrOutOfOrderSample:
|
||||||
numOutOfOrder++
|
numOutOfOrder++
|
||||||
|
|
|
@ -1163,3 +1163,60 @@ func TestGroupHasAlertingRules(t *testing.T) {
|
||||||
require.Equal(t, test.want, got, "test case %d failed, expected:%t got:%t", i, test.want, got)
|
require.Equal(t, test.want, got, "test case %d failed, expected:%t got:%t", i, test.want, got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRuleHealthUpdates(t *testing.T) {
|
||||||
|
st := teststorage.New(t)
|
||||||
|
defer st.Close()
|
||||||
|
engineOpts := promql.EngineOpts{
|
||||||
|
Logger: nil,
|
||||||
|
Reg: nil,
|
||||||
|
MaxSamples: 10,
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
}
|
||||||
|
engine := promql.NewEngine(engineOpts)
|
||||||
|
opts := &ManagerOptions{
|
||||||
|
QueryFunc: EngineQueryFunc(engine, st),
|
||||||
|
Appendable: st,
|
||||||
|
Queryable: st,
|
||||||
|
Context: context.Background(),
|
||||||
|
Logger: log.NewNopLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
expr, err := parser.ParseExpr("a + 1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule := NewRecordingRule("a_plus_one", expr, labels.Labels{})
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "default",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule},
|
||||||
|
ShouldRestore: true,
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
// A time series that has two samples.
|
||||||
|
app := st.Appender(context.Background())
|
||||||
|
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
|
||||||
|
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
|
||||||
|
err = app.Commit()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
rules := group.Rules()[0]
|
||||||
|
require.NoError(t, rules.LastError())
|
||||||
|
require.Equal(t, HealthUnknown, rules.Health())
|
||||||
|
|
||||||
|
// Execute 2 times, it should be all green.
|
||||||
|
group.Eval(ctx, time.Unix(0, 0))
|
||||||
|
group.Eval(ctx, time.Unix(1, 0))
|
||||||
|
|
||||||
|
rules = group.Rules()[0]
|
||||||
|
require.NoError(t, rules.LastError())
|
||||||
|
require.Equal(t, HealthGood, rules.Health())
|
||||||
|
|
||||||
|
// Now execute the rule in the past again, this should cause append failures.
|
||||||
|
group.Eval(ctx, time.Unix(0, 0))
|
||||||
|
rules = group.Rules()[0]
|
||||||
|
require.EqualError(t, rules.LastError(), storage.ErrOutOfOrderSample.Error())
|
||||||
|
require.Equal(t, HealthBad, rules.Health())
|
||||||
|
}
|
||||||
|
|
|
@ -76,8 +76,6 @@ func (rule *RecordingRule) Labels() labels.Labels {
|
||||||
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) {
|
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) {
|
||||||
vector, err := query(ctx, rule.vector.String(), ts)
|
vector, err := query(ctx, rule.vector.String(), ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rule.SetHealth(HealthBad)
|
|
||||||
rule.SetLastError(err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Override the metric name and labels.
|
// Override the metric name and labels.
|
||||||
|
|
Loading…
Reference in a new issue