diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 6d403ec27..fbdb96109 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -724,6 +724,7 @@ func main() { files, cfg.GlobalConfig.ExternalLabels, externalURL, + nil, ) }, }, { diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index 5d0633074..760f92833 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -70,7 +70,7 @@ func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient que // loadGroups parses groups from a list of recording rule files. func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) { - groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, "", filenames...) + groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, "", nil, filenames...) if errs != nil { return errs } diff --git a/cmd/promtool/unittest.go b/cmd/promtool/unittest.go index 0e05997b8..57b6103b0 100644 --- a/cmd/promtool/unittest.go +++ b/cmd/promtool/unittest.go @@ -170,7 +170,7 @@ func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]i Logger: log.NewNopLogger(), } m := rules.NewManager(opts) - groupsMap, ers := m.LoadGroups(time.Duration(tg.Interval), tg.ExternalLabels, tg.ExternalURL, ruleFiles...) + groupsMap, ers := m.LoadGroups(time.Duration(tg.Interval), tg.ExternalLabels, tg.ExternalURL, nil, ruleFiles...) if ers != nil { return ers } diff --git a/rules/alerting.go b/rules/alerting.go index 929f7586d..34155a6dc 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/template" "github.com/prometheus/prometheus/util/strutil" ) @@ -258,6 +259,33 @@ func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) pro return s } +// QueryforStateSeries returns the series for ALERTS_FOR_STATE. +func (r *AlertingRule) QueryforStateSeries(alert *Alert, q storage.Querier) (storage.Series, error) { + smpl := r.forStateSample(alert, time.Now(), 0) + var matchers []*labels.Matcher + for _, l := range smpl.Metric { + mt, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value) + if err != nil { + panic(err) + } + matchers = append(matchers, mt) + } + sset := q.Select(false, nil, matchers...) + + var s storage.Series + for sset.Next() { + // Query assures that smpl.Metric is included in sset.At().Labels(), + // hence just checking the length would act like equality. + // (This is faster than calling labels.Compare again as we already have some info). + if len(sset.At().Labels()) == len(matchers) { + s = sset.At() + break + } + } + + return s, sset.Err() +} + // SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation. func (r *AlertingRule) SetEvaluationDuration(dur time.Duration) { r.mtx.Lock() diff --git a/rules/alerting_test.go b/rules/alerting_test.go index 31139298d..e5650a0cc 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -20,12 +20,14 @@ import ( "time" "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/teststorage" ) @@ -518,3 +520,83 @@ func TestAlertingRuleLimit(t *testing.T) { } } } + +func TestQueryForStateSeries(t *testing.T) { + testError := errors.New("test error") + + type testInput struct { + selectMockFunction func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet + expectedSeries storage.Series + expectedError error + } + + tests := []testInput{ + // Test for empty series. + { + selectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return storage.EmptySeriesSet() + }, + expectedSeries: nil, + expectedError: nil, + }, + // Test for error series. + { + selectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return storage.ErrSeriesSet(testError) + }, + expectedSeries: nil, + expectedError: testError, + }, + // Test for mock series. + { + selectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return storage.TestSeriesSet(storage.MockSeries( + []int64{1, 2, 3}, + []float64{1, 2, 3}, + []string{"__name__", "ALERTS_FOR_STATE", "alertname", "TestRule", "severity", "critical"}, + )) + }, + expectedSeries: storage.MockSeries( + []int64{1, 2, 3}, + []float64{1, 2, 3}, + []string{"__name__", "ALERTS_FOR_STATE", "alertname", "TestRule", "severity", "critical"}, + ), + expectedError: nil, + }, + } + + testFunc := func(tst testInput) { + querier := &storage.MockQuerier{ + SelectMockFunction: tst.selectMockFunction, + } + + rule := NewAlertingRule( + "TestRule", + nil, + time.Minute, + labels.FromStrings("severity", "critical"), + nil, nil, "", true, nil, + ) + + alert := &Alert{ + State: 0, + Labels: nil, + Annotations: nil, + Value: 0, + ActiveAt: time.Time{}, + FiredAt: time.Time{}, + ResolvedAt: time.Time{}, + LastSentAt: time.Time{}, + ValidUntil: time.Time{}, + } + + series, err := rule.QueryforStateSeries(alert, querier) + + require.Equal(t, tst.expectedSeries, series) + require.Equal(t, tst.expectedError, err) + } + + for _, tst := range tests { + testFunc(tst) + } +} diff --git a/rules/manager.go b/rules/manager.go index ca6e363c5..5ca05505d 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -264,16 +264,23 @@ type Group struct { logger log.Logger metrics *Metrics + + ruleGroupPostProcessFunc RuleGroupPostProcessFunc } +// This function will be used before each rule group evaluation if not nil. +// Use this function type if the rule group post processing is needed. +type RuleGroupPostProcessFunc func(g *Group, lastEvalTimestamp time.Time, log log.Logger) error + type GroupOptions struct { - Name, File string - Interval time.Duration - Limit int - Rules []Rule - ShouldRestore bool - Opts *ManagerOptions - done chan struct{} + Name, File string + Interval time.Duration + Limit int + Rules []Rule + ShouldRestore bool + Opts *ManagerOptions + done chan struct{} + RuleGroupPostProcessFunc RuleGroupPostProcessFunc } // NewGroup makes a new Group with the given name, options, and rules. @@ -295,19 +302,20 @@ func NewGroup(o GroupOptions) *Group { metrics.GroupInterval.WithLabelValues(key).Set(o.Interval.Seconds()) return &Group{ - name: o.Name, - file: o.File, - interval: o.Interval, - limit: o.Limit, - rules: o.Rules, - shouldRestore: o.ShouldRestore, - opts: o.Opts, - seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), - done: make(chan struct{}), - managerDone: o.done, - terminated: make(chan struct{}), - logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), - metrics: metrics, + name: o.Name, + file: o.File, + interval: o.Interval, + limit: o.Limit, + rules: o.Rules, + shouldRestore: o.ShouldRestore, + opts: o.Opts, + seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), + done: make(chan struct{}), + managerDone: o.done, + terminated: make(chan struct{}), + logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), + metrics: metrics, + ruleGroupPostProcessFunc: o.RuleGroupPostProcessFunc, } } @@ -320,6 +328,12 @@ func (g *Group) File() string { return g.file } // Rules returns the group's rules. func (g *Group) Rules() []Rule { return g.rules } +// Queryable returns the group's querable. +func (g *Group) Queryable() storage.Queryable { return g.opts.Queryable } + +// Context returns the group's context. +func (g *Group) Context() context.Context { return g.opts.Context } + // Interval returns the group's interval. func (g *Group) Interval() time.Duration { return g.interval } @@ -423,12 +437,24 @@ func (g *Group) run(ctx context.Context) { g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) } evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) + + useRuleGroupPostProcessFunc(g, evalTimestamp.Add(-(missed+1)*g.interval)) + iter() } } } } +func useRuleGroupPostProcessFunc(g *Group, lastEvalTimestamp time.Time) { + if g.ruleGroupPostProcessFunc != nil { + err := g.ruleGroupPostProcessFunc(g, lastEvalTimestamp, g.logger) + if err != nil { + level.Warn(g.logger).Log("msg", "ruleGroupPostProcessFunc failed", "err", err) + } + } +} + func (g *Group) stop() { close(g.done) <-g.terminated @@ -744,32 +770,10 @@ func (g *Group) RestoreForState(ts time.Time) { } alertRule.ForEachActiveAlert(func(a *Alert) { - smpl := alertRule.forStateSample(a, time.Now(), 0) - var matchers []*labels.Matcher - for _, l := range smpl.Metric { - mt, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value) - if err != nil { - panic(err) - } - matchers = append(matchers, mt) - } - - sset := q.Select(false, nil, matchers...) - - seriesFound := false var s storage.Series - for sset.Next() { - // Query assures that smpl.Metric is included in sset.At().Labels(), - // hence just checking the length would act like equality. - // (This is faster than calling labels.Compare again as we already have some info). - if len(sset.At().Labels()) == len(smpl.Metric) { - s = sset.At() - seriesFound = true - break - } - } - if err := sset.Err(); err != nil { + s, err := alertRule.QueryforStateSeries(a, q) + if err != nil { // Querier Warnings are ignored. We do not care unless we have an error. level.Error(g.logger).Log( "msg", "Failed to restore 'for' state", @@ -780,7 +784,7 @@ func (g *Group) RestoreForState(ts time.Time) { return } - if !seriesFound { + if s == nil { return } @@ -958,11 +962,12 @@ func (m *Manager) Stop() { // Update the rule manager's state as the config requires. If // loading the new rules failed the old rule set is restored. -func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error { +func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, ruleGroupPostProcessFunc RuleGroupPostProcessFunc) error { m.mtx.Lock() defer m.mtx.Unlock() - groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...) + groups, errs := m.LoadGroups(interval, externalLabels, externalURL, ruleGroupPostProcessFunc, files...) + if errs != nil { for _, e := range errs { level.Error(m.logger).Log("msg", "loading groups failed", "err", e) @@ -1046,7 +1051,7 @@ func (FileLoader) Parse(query string) (parser.Expr, error) { return parser.Parse // LoadGroups reads groups from a list of files. func (m *Manager) LoadGroups( - interval time.Duration, externalLabels labels.Labels, externalURL string, filenames ...string, + interval time.Duration, externalLabels labels.Labels, externalURL string, ruleGroupPostProcessFunc RuleGroupPostProcessFunc, filenames ...string, ) (map[string]*Group, []error) { groups := make(map[string]*Group) @@ -1093,14 +1098,15 @@ func (m *Manager) LoadGroups( } groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ - Name: rg.Name, - File: fn, - Interval: itv, - Limit: rg.Limit, - Rules: rules, - ShouldRestore: shouldRestore, - Opts: m.opts, - done: m.done, + Name: rg.Name, + File: fn, + Interval: itv, + Limit: rg.Limit, + Rules: rules, + ShouldRestore: shouldRestore, + Opts: m.opts, + done: m.done, + RuleGroupPostProcessFunc: ruleGroupPostProcessFunc, }) } } diff --git a/rules/manager_test.go b/rules/manager_test.go index 5d9c602b9..121fee393 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -727,7 +727,7 @@ func TestUpdate(t *testing.T) { ruleManager.start() defer ruleManager.Stop() - err := ruleManager.Update(10*time.Second, files, nil, "") + err := ruleManager.Update(10*time.Second, files, nil, "", nil) require.NoError(t, err) require.Greater(t, len(ruleManager.groups), 0, "expected non-empty rule groups") ogs := map[string]*Group{} @@ -738,7 +738,7 @@ func TestUpdate(t *testing.T) { ogs[h] = g } - err = ruleManager.Update(10*time.Second, files, nil, "") + err = ruleManager.Update(10*time.Second, files, nil, "", nil) require.NoError(t, err) for h, g := range ruleManager.groups { for _, actual := range g.seriesInPreviousEval { @@ -757,7 +757,7 @@ func TestUpdate(t *testing.T) { defer os.Remove(tmpFile.Name()) defer tmpFile.Close() - err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "") + err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "", nil) require.NoError(t, err) for h, g := range ruleManager.groups { @@ -835,7 +835,7 @@ func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, tmpFile.Seek(0, 0) _, err = tmpFile.Write(bs) require.NoError(t, err) - err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "") + err = ruleManager.Update(10*time.Second, []string{tmpFile.Name()}, nil, "", nil) require.NoError(t, err) for h, g := range ruleManager.groups { if ogs[h] == g { @@ -980,7 +980,7 @@ func TestMetricsUpdate(t *testing.T) { } for i, c := range cases { - err := ruleManager.Update(time.Second, c.files, nil, "") + err := ruleManager.Update(time.Second, c.files, nil, "", nil) require.NoError(t, err) time.Sleep(2 * time.Second) require.Equal(t, c.metrics, countMetrics(), "test %d: invalid count of metrics", i) @@ -1054,7 +1054,7 @@ func TestGroupStalenessOnRemoval(t *testing.T) { var totalStaleNaN int for i, c := range cases { - err := ruleManager.Update(time.Second, c.files, nil, "") + err := ruleManager.Update(time.Second, c.files, nil, "", nil) require.NoError(t, err) time.Sleep(3 * time.Second) totalStaleNaN += c.staleNaN @@ -1096,11 +1096,11 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) { } }() - err := ruleManager.Update(2*time.Second, files, nil, "") + err := ruleManager.Update(2*time.Second, files, nil, "", nil) time.Sleep(4 * time.Second) require.NoError(t, err) start := time.Now() - err = ruleManager.Update(3*time.Second, files[:0], nil, "") + err = ruleManager.Update(3*time.Second, files[:0], nil, "", nil) require.NoError(t, err) ruleManager.Stop() stopped = true @@ -1229,3 +1229,100 @@ func TestRuleHealthUpdates(t *testing.T) { require.EqualError(t, rules.LastError(), storage.ErrOutOfOrderSample.Error()) require.Equal(t, HealthBad, rules.Health()) } + +func TestUpdateMissedEvalMetrics(t *testing.T) { + suite, err := promql.NewTest(t, ` + load 5m + http_requests{instance="0"} 75 85 50 0 0 25 0 0 40 0 120 + `) + + require.NoError(t, err) + defer suite.Close() + + err = suite.Run() + require.NoError(t, err) + + expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) + require.NoError(t, err) + + testValue := 1 + + overrideFunc := func(g *Group, lastEvalTimestamp time.Time, log log.Logger) error { + testValue = 2 + return nil + } + + type testInput struct { + overrideFunc func(g *Group, lastEvalTimestamp time.Time, logger log.Logger) error + expectedValue int + } + + tests := []testInput{ + // testValue should still have value of 1 since overrideFunc is nil. + { + overrideFunc: nil, + expectedValue: 1, + }, + // testValue should be incremented to 2 since overrideFunc is called. + { + overrideFunc: overrideFunc, + expectedValue: 2, + }, + } + + testFunc := func(tst testInput) { + opts := &ManagerOptions{ + QueryFunc: EngineQueryFunc(suite.QueryEngine(), suite.Storage()), + Appendable: suite.Storage(), + Queryable: suite.Storage(), + Context: context.Background(), + Logger: log.NewNopLogger(), + NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) {}, + OutageTolerance: 30 * time.Minute, + ForGracePeriod: 10 * time.Minute, + } + + activeAlert := &Alert{ + State: StateFiring, + ActiveAt: time.Now(), + } + + m := map[uint64]*Alert{} + m[1] = activeAlert + + rule := &AlertingRule{ + name: "HTTPRequestRateLow", + vector: expr, + holdDuration: 5 * time.Minute, + labels: labels.FromStrings("severity", "critical"), + annotations: nil, + externalLabels: nil, + externalURL: "", + health: HealthUnknown, + active: m, + logger: nil, + restored: true, + } + + group := NewGroup(GroupOptions{ + Name: "default", + Interval: time.Second, + Rules: []Rule{rule}, + ShouldRestore: true, + Opts: opts, + RuleGroupPostProcessFunc: tst.overrideFunc, + }) + + go func() { + group.run(opts.Context) + }() + + time.Sleep(3 * time.Second) + group.stop() + require.Equal(t, tst.expectedValue, testValue) + } + + for _, tst := range tests { + testFunc(tst) + } +} diff --git a/storage/interface.go b/storage/interface.go index 47af6c2c3..6ce147b08 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -82,6 +82,15 @@ type Queryable interface { Querier(ctx context.Context, mint, maxt int64) (Querier, error) } +// A MockQueryable is used for testing purposes so that a mock Querier can be used. +type MockQueryable struct { + MockQuerier Querier +} + +func (q *MockQueryable) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { + return q.MockQuerier, nil +} + // Querier provides querying access over time series data of a fixed time range. type Querier interface { LabelQuerier @@ -92,6 +101,27 @@ type Querier interface { Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet } +// MockQuerier is used for test purposes to mock the selected series that is returned. +type MockQuerier struct { + SelectMockFunction func(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet +} + +func (q *MockQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) { + return nil, nil, nil +} + +func (q *MockQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, Warnings, error) { + return nil, nil, nil +} + +func (q *MockQuerier) Close() error { + return nil +} + +func (q *MockQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet { + return q.SelectMockFunction(sortSeries, hints, matchers...) +} + // A ChunkQueryable handles queries against a storage. // Use it when you need to have access to samples in encoded format. type ChunkQueryable interface { @@ -240,6 +270,20 @@ func EmptySeriesSet() SeriesSet { return emptySeriesSet } +type testSeriesSet struct { + series Series +} + +func (s testSeriesSet) Next() bool { return true } +func (s testSeriesSet) At() Series { return s.series } +func (s testSeriesSet) Err() error { return nil } +func (s testSeriesSet) Warnings() Warnings { return nil } + +// TestSeriesSet returns a mock series set +func TestSeriesSet(series Series) SeriesSet { + return testSeriesSet{series: series} +} + type errSeriesSet struct { err error } @@ -281,6 +325,29 @@ type Series interface { SampleIterable } +type mockSeries struct { + timestamps []int64 + values []float64 + labelSet []string +} + +func (s mockSeries) Labels() labels.Labels { + return labels.FromStrings(s.labelSet...) +} + +func (s mockSeries) Iterator() chunkenc.Iterator { + return chunkenc.MockSeriesIterator(s.timestamps, s.values) +} + +// MockSeries returns a series with custom timestamps, values and labelSet. +func MockSeries(timestamps []int64, values []float64, labelSet []string) Series { + return mockSeries{ + timestamps: timestamps, + values: values, + labelSet: labelSet, + } +} + // ChunkSeriesSet contains a set of chunked series. type ChunkSeriesSet interface { Next() bool diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index f52b5b932..bffb7e75a 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -89,6 +89,36 @@ type Iterator interface { Err() error } +// MockSeriesIterator returns an iterator for a mock series with custom timeStamps and values. +func MockSeriesIterator(timestamps []int64, values []float64) Iterator { + return &mockSeriesIterator{ + timeStamps: timestamps, + values: values, + currIndex: 0, + } +} + +type mockSeriesIterator struct { + timeStamps []int64 + values []float64 + currIndex int +} + +func (it *mockSeriesIterator) Seek(int64) bool { return false } +func (it *mockSeriesIterator) At() (int64, float64) { + return it.timeStamps[it.currIndex], it.values[it.currIndex] +} + +func (it *mockSeriesIterator) Next() bool { + if it.currIndex < len(it.timeStamps)-1 { + it.currIndex++ + return true + } + + return false +} +func (it *mockSeriesIterator) Err() error { return nil } + // NewNopIterator returns a new chunk iterator that does not hold any data. func NewNopIterator() Iterator { return nopIterator{}