From f8fccc73d8705c727cb85adef8d5147b1455388c Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 23 Nov 2017 13:04:37 +0100 Subject: [PATCH 1/5] promql: remove global metrics --- promql/engine.go | 137 ++++++++++++++++++++++++----------------------- 1 file changed, 71 insertions(+), 66 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 29e3869ad..a4503f45e 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -47,64 +47,13 @@ const ( minInt64 = -9223372036854775808 ) -var ( - currentQueries = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "queries", - Help: "The current number of queries being executed or waiting.", - }) - maxConcurrentQueries = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "queries_concurrent_max", - Help: "The max number of concurrent queries.", - }) - queryPrepareTime = prometheus.NewSummary( - prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "query_duration_seconds", - Help: "Query timings", - ConstLabels: prometheus.Labels{"slice": "prepare_time"}, - }, - ) - queryInnerEval = prometheus.NewSummary( - prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "query_duration_seconds", - Help: "Query timings", - ConstLabels: prometheus.Labels{"slice": "inner_eval"}, - }, - ) - queryResultAppend = prometheus.NewSummary( - prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "query_duration_seconds", - Help: "Query timings", - ConstLabels: prometheus.Labels{"slice": "result_append"}, - }, - ) - queryResultSort = prometheus.NewSummary( - prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "query_duration_seconds", - Help: "Query timings", - ConstLabels: prometheus.Labels{"slice": "result_sort"}, - }, - ) -) - -func init() { - prometheus.MustRegister(currentQueries) - prometheus.MustRegister(maxConcurrentQueries) - prometheus.MustRegister(queryPrepareTime) - prometheus.MustRegister(queryInnerEval) - prometheus.MustRegister(queryResultAppend) - prometheus.MustRegister(queryResultSort) +type engineMetrics struct { + currentQueries prometheus.Gauge + maxConcurrentQueries prometheus.Gauge + queryPrepareTime prometheus.Summary + queryInnerEval prometheus.Summary + queryResultAppend prometheus.Summary + queryResultSort prometheus.Summary } // convertibleToInt64 returns true if v does not over-/underflow an int64. @@ -203,6 +152,7 @@ func contextDone(ctx context.Context, env string) error { type Engine struct { // A Querier constructor against an underlying storage. queryable Queryable + metrics *engineMetrics // The gate limiting the maximum number of concurrent and waiting queries. gate *queryGate options *EngineOptions @@ -220,12 +170,66 @@ func NewEngine(queryable Queryable, o *EngineOptions) *Engine { if o == nil { o = DefaultEngineOptions } - maxConcurrentQueries.Set(float64(o.MaxConcurrentQueries)) + metrics := &engineMetrics{ + currentQueries: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queries", + Help: "The current number of queries being executed or waiting.", + }), + maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queries_concurrent_max", + Help: "The max number of concurrent queries.", + }), + queryPrepareTime: prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "query_duration_seconds", + Help: "Query timings", + ConstLabels: prometheus.Labels{"slice": "prepare_time"}, + }), + queryInnerEval: prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "query_duration_seconds", + Help: "Query timings", + ConstLabels: prometheus.Labels{"slice": "inner_eval"}, + }), + queryResultAppend: prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "query_duration_seconds", + Help: "Query timings", + ConstLabels: prometheus.Labels{"slice": "result_append"}, + }), + queryResultSort: prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "query_duration_seconds", + Help: "Query timings", + ConstLabels: prometheus.Labels{"slice": "result_sort"}, + }), + } + metrics.maxConcurrentQueries.Set(float64(o.MaxConcurrentQueries)) + + if o.Metrics != nil { + o.Metrics.MustRegister( + metrics.currentQueries, + metrics.maxConcurrentQueries, + metrics.queryInnerEval, + metrics.queryPrepareTime, + metrics.queryResultAppend, + metrics.queryResultSort, + ) + } return &Engine{ queryable: queryable, gate: newQueryGate(o.MaxConcurrentQueries), options: o, logger: o.Logger, + metrics: metrics, } } @@ -234,6 +238,7 @@ type EngineOptions struct { MaxConcurrentQueries int Timeout time.Duration Logger log.Logger + Metrics prometheus.Registerer } // DefaultEngineOptions are the default engine options. @@ -308,8 +313,8 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query { // At this point per query only one EvalStmt is evaluated. Alert and record // statements are not handled by the Engine. func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { - currentQueries.Inc() - defer currentQueries.Dec() + ng.metrics.currentQueries.Inc() + defer ng.metrics.currentQueries.Dec() ctx, cancel := context.WithTimeout(ctx, ng.options.Timeout) q.cancel = cancel @@ -362,7 +367,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() querier, err := ng.populateIterators(ctx, s) prepareTimer.Stop() - queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds()) + ng.metrics.queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds()) // XXX(fabxc): the querier returned by populateIterators might be instantiated // we must not return without closing irrespective of the error. @@ -390,7 +395,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( } evalTimer.Stop() - queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) + ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) // Point might have a different timestamp, force it to the evaluation // timestamp as that is when we ran the evaluation. switch v := val.(type) { @@ -456,7 +461,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( } } evalTimer.Stop() - queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) + ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) if err := contextDone(ctx, "expression evaluation"); err != nil { return nil, err @@ -468,7 +473,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( mat = append(mat, ss) } appendTimer.Stop() - queryResultAppend.Observe(appendTimer.ElapsedTime().Seconds()) + ng.metrics.queryResultAppend.Observe(appendTimer.ElapsedTime().Seconds()) if err := contextDone(ctx, "expression evaluation"); err != nil { return nil, err @@ -480,7 +485,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( sort.Sort(mat) sortTimer.Stop() - queryResultSort.Observe(sortTimer.ElapsedTime().Seconds()) + ng.metrics.queryResultSort.Observe(sortTimer.ElapsedTime().Seconds()) return mat, nil } From 2d0e3746ac9e541f72a3c23702ec54817be58c33 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 23 Nov 2017 13:04:54 +0100 Subject: [PATCH 2/5] rules: remove dependency on promql.Engine --- cmd/prometheus/main.go | 5 +- rules/alerting.go | 10 +-- rules/manager.go | 37 +++++++++-- rules/manager_test.go | 133 ++++++++++++++++++++++++-------------- rules/recording.go | 25 +------ rules/recording_test.go | 2 +- template/template.go | 43 +++++------- template/template_test.go | 108 +++++++++++++++++-------------- web/web.go | 20 +++++- 9 files changed, 219 insertions(+), 164 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index e0f03e6f8..4b19c9b61 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -96,6 +96,9 @@ func main() { notifier: notifier.Options{ Registerer: prometheus.DefaultRegisterer, }, + queryEngine: promql.EngineOptions{ + Metrics: prometheus.DefaultRegisterer, + }, } a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server") @@ -234,7 +237,7 @@ func main() { ruleManager := rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, Notifier: notifier, - QueryEngine: queryEngine, + Query: rules.EngineQueryFunc(queryEngine), Context: ctx, ExternalURL: cfg.web.ExternalURL, Logger: log.With(logger, "component", "rule manager"), diff --git a/rules/alerting.go b/rules/alerting.go index f90413713..3b8b61e9f 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -171,12 +171,8 @@ const resolvedRetention = 15 * time.Minute // Eval evaluates the rule expression and then creates pending alerts and fires // or removes previously pending alerts accordingly. -func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, engine *promql.Engine, externalURL *url.URL) (promql.Vector, error) { - query, err := engine.NewInstantQuery(r.vector.String(), ts) - if err != nil { - return nil, err - } - res, err := query.Exec(ctx).Vector() +func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) { + res, err := query(ctx, r.vector.String(), ts) if err != nil { return nil, err } @@ -213,7 +209,7 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, engine *promql.En "__alert_"+r.Name(), tmplData, model.Time(timestamp.FromTime(ts)), - engine, + template.QueryFunc(query), externalURL, ) result, err := tmpl.Expand() diff --git a/rules/manager.go b/rules/manager.go index 43f316056..3d7107036 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -107,12 +107,42 @@ const ( ruleTypeRecording = "recording" ) +// QueryFunc processes PromQL queries. +type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, error) + +// EngineQueryFunc returns a new query function that executes instant queries against +// the given engine. +// It converts scaler into vector results. +func EngineQueryFunc(engine *promql.Engine) QueryFunc { + return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + q, err := engine.NewInstantQuery(qs, t) + if err != nil { + return nil, err + } + res := q.Exec(ctx) + if res.Err != nil { + return nil, res.Err + } + switch v := res.Value.(type) { + case promql.Vector: + return v, nil + case promql.Scalar: + return promql.Vector{promql.Sample{ + Point: promql.Point(v), + Metric: labels.Labels{}, + }}, nil + default: + return nil, fmt.Errorf("rule result is not a vector or scalar") + } + } +} + // A Rule encapsulates a vector expression which is evaluated at a specified // interval and acted upon (currently either recorded or used for alerting). type Rule interface { Name() string // eval evaluates the rule, including any associated recording or alerting actions. - Eval(context.Context, time.Time, *promql.Engine, *url.URL) (promql.Vector, error) + Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error) // String returns a human-readable string representation of the rule. String() string @@ -220,7 +250,6 @@ func (g *Group) hash() uint64 { labels.Label{"name", g.name}, labels.Label{"file", g.file}, ) - return l.Hash() } @@ -319,7 +348,7 @@ func (g *Group) Eval(ts time.Time) { evalTotal.WithLabelValues(rtyp).Inc() - vector, err := rule.Eval(g.opts.Context, ts, g.opts.QueryEngine, g.opts.ExternalURL) + vector, err := rule.Eval(g.opts.Context, ts, g.opts.Query, g.opts.ExternalURL) if err != nil { // Canceled queries are intentional termination of queries. This normally // happens on shutdown and thus we skip logging of any errors here. @@ -439,7 +468,7 @@ type Appendable interface { // ManagerOptions bundles options for the Manager. type ManagerOptions struct { ExternalURL *url.URL - QueryEngine *promql.Engine + Query QueryFunc Context context.Context Notifier *notifier.Notifier Appendable Appendable diff --git a/rules/manager_test.go b/rules/manager_test.go index 5ff947129..b9cb2ba49 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -17,7 +17,7 @@ import ( "context" "fmt" "math" - "strings" + "sort" "testing" "time" @@ -55,75 +55,108 @@ func TestAlertingRule(t *testing.T) { labels.FromStrings("severity", "{{\"c\"}}ritical"), nil, nil, ) + result := promql.Vector{ + { + Metric: labels.FromStrings( + "__name__", "ALERTS", + "alertname", "HTTPRequestRateLow", + "alertstate", "pending", + "group", "canary", + "instance", "0", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + { + Metric: labels.FromStrings( + "__name__", "ALERTS", + "alertname", "HTTPRequestRateLow", + "alertstate", "pending", + "group", "canary", + "instance", "1", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + { + Metric: labels.FromStrings( + "__name__", "ALERTS", + "alertname", "HTTPRequestRateLow", + "alertstate", "firing", + "group", "canary", + "instance", "0", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + { + Metric: labels.FromStrings( + "__name__", "ALERTS", + "alertname", "HTTPRequestRateLow", + "alertstate", "firing", + "group", "canary", + "instance", "1", + "job", "app-server", + "severity", "critical", + ), + Point: promql.Point{V: 1}, + }, + } baseTime := time.Unix(0, 0) var tests = []struct { time time.Duration - result []string + result promql.Vector }{ { - time: 0, - result: []string{ - `{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, - `{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="1", job="app-server", severity="critical"} => 1 @[%v]`, - }, + time: 0, + result: result[:2], }, { - time: 5 * time.Minute, - result: []string{ - `{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, - `{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="1", job="app-server", severity="critical"} => 1 @[%v]`, - }, + time: 5 * time.Minute, + result: result[2:], }, { - time: 10 * time.Minute, - result: []string{ - `{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, - }, + time: 10 * time.Minute, + result: result[2:3], }, { time: 15 * time.Minute, - result: []string{}, + result: nil, }, { time: 20 * time.Minute, - result: []string{}, + result: nil, }, { - time: 25 * time.Minute, - result: []string{ - `{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, - }, + time: 25 * time.Minute, + result: result[:1], }, { - time: 30 * time.Minute, - result: []string{ - `{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, - }, + time: 30 * time.Minute, + result: result[2:3], }, } for i, test := range tests { + t.Logf("case %d", i) + evalTime := baseTime.Add(test.time) - res, err := rule.Eval(suite.Context(), evalTime, suite.QueryEngine(), nil) + res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine()), nil) testutil.Ok(t, err) - actual := strings.Split(res.String(), "\n") - expected := annotateWithTime(test.result, evalTime) - if actual[0] == "" { - actual = []string{} + for i := range test.result { + test.result[i].T = timestamp.FromTime(evalTime) } - testutil.Assert(t, len(expected) == len(actual), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(expected), len(actual)) + testutil.Assert(t, len(test.result) == len(res), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res)) - for j, expectedSample := range expected { - found := false - for _, actualSample := range actual { - if actualSample == expectedSample { - found = true - } - } - testutil.Assert(t, found, "%d.%d. Couldn't find expected sample in output: '%v'", i, j, expectedSample) - } + sort.Slice(res, func(i, j int) bool { + return labels.Compare(res[i].Metric, res[j].Metric) < 0 + }) + testutil.Equals(t, test.result, res) for _, aa := range rule.ActiveAlerts() { testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels) @@ -144,10 +177,10 @@ func TestStaleness(t *testing.T) { defer storage.Close() engine := promql.NewEngine(storage, nil) opts := &ManagerOptions{ - QueryEngine: engine, - Appendable: storage, - Context: context.Background(), - Logger: log.NewNopLogger(), + Query: EngineQueryFunc(engine), + Appendable: storage, + Context: context.Background(), + Logger: log.NewNopLogger(), } expr, err := promql.ParseExpr("a + 1") @@ -271,11 +304,11 @@ func TestApplyConfig(t *testing.T) { conf, err := config.LoadFile("../config/testdata/conf.good.yml") testutil.Ok(t, err) ruleManager := NewManager(&ManagerOptions{ - Appendable: nil, - Notifier: nil, - QueryEngine: nil, - Context: context.Background(), - Logger: log.NewNopLogger(), + Appendable: nil, + Notifier: nil, + Query: nil, + Context: context.Background(), + Logger: log.NewNopLogger(), }) ruleManager.Run() diff --git a/rules/recording.go b/rules/recording.go index 26a202445..438f78780 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -53,32 +53,11 @@ func (rule *RecordingRule) Name() string { } // Eval evaluates the rule and then overrides the metric names and labels accordingly. -func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, engine *promql.Engine, _ *url.URL) (promql.Vector, error) { - query, err := engine.NewInstantQuery(rule.vector.String(), ts) +func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) { + vector, err := query(ctx, rule.vector.String(), ts) if err != nil { return nil, err } - - var ( - result = query.Exec(ctx) - vector promql.Vector - ) - if result.Err != nil { - return nil, err - } - - switch v := result.Value.(type) { - case promql.Vector: - vector = v - case promql.Scalar: - vector = promql.Vector{promql.Sample{ - Point: promql.Point(v), - Metric: labels.Labels{}, - }} - default: - return nil, fmt.Errorf("rule result is not a vector or scalar") - } - // Override the metric name and labels. for i := range vector { sample := &vector[i] diff --git a/rules/recording_test.go b/rules/recording_test.go index 0826767ca..cc997d9b2 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -62,7 +62,7 @@ func TestRuleEval(t *testing.T) { for _, test := range suite { rule := NewRecordingRule(test.name, test.expr, test.labels) - result, err := rule.Eval(ctx, now, engine, nil) + result, err := rule.Eval(ctx, now, EngineQueryFunc(engine), nil) testutil.Ok(t, err) testutil.Equals(t, result, test.result) } diff --git a/template/template.go b/template/template.go index b8a87f1a0..71bc486e1 100644 --- a/template/template.go +++ b/template/template.go @@ -30,7 +30,6 @@ import ( "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/util/strutil" ) @@ -59,34 +58,14 @@ func (q queryResultByLabelSorter) Swap(i, j int) { q.results[i], q.results[j] = q.results[j], q.results[i] } -func query(ctx context.Context, q string, ts time.Time, queryEngine *promql.Engine) (queryResult, error) { - query, err := queryEngine.NewInstantQuery(q, ts) +// QueryFunc executes a PromQL query at the given time. +type QueryFunc func(context.Context, string, time.Time) (promql.Vector, error) + +func query(ctx context.Context, q string, ts time.Time, queryFn QueryFunc) (queryResult, error) { + vector, err := queryFn(ctx, q, ts) if err != nil { return nil, err } - res := query.Exec(ctx) - if res.Err != nil { - return nil, res.Err - } - var vector promql.Vector - - switch v := res.Value.(type) { - case promql.Matrix: - return nil, errors.New("matrix return values not supported") - case promql.Vector: - vector = v - case promql.Scalar: - vector = promql.Vector{promql.Sample{ - Point: promql.Point(v), - }} - case promql.String: - vector = promql.Vector{promql.Sample{ - Metric: labels.FromStrings("__value__", v.V), - Point: promql.Point{T: v.T}, - }} - default: - panic("template.query: unhandled result value type") - } // promql.Vector is hard to work with in templates, so convert to // base data types. @@ -111,14 +90,22 @@ type Expander struct { } // NewTemplateExpander returns a template expander ready to use. -func NewTemplateExpander(ctx context.Context, text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, externalURL *url.URL) *Expander { +func NewTemplateExpander( + ctx context.Context, + text string, + name string, + data interface{}, + timestamp model.Time, + queryFunc QueryFunc, + externalURL *url.URL, +) *Expander { return &Expander{ text: text, name: name, data: data, funcMap: text_template.FuncMap{ "query": func(q string) (queryResult, error) { - return query(ctx, q, timestamp.Time(), queryEngine) + return query(ctx, q, timestamp.Time(), queryFunc) }, "first": func(v queryResult) (*sample, error) { if len(v) > 0 { diff --git a/template/template_test.go b/template/template_test.go index 686d820f6..6a2b81d1a 100644 --- a/template/template_test.go +++ b/template/template_test.go @@ -18,21 +18,19 @@ import ( "math" "net/url" "testing" - - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" + "time" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/util/testutil" ) type testTemplatesScenario struct { - text string - output string - input interface{} - shouldFail bool - html bool + text string + output string + input interface{} + queryResult promql.Vector + shouldFail bool + html bool } func TestTemplateExpansion(t *testing.T) { @@ -70,42 +68,72 @@ func TestTemplateExpansion(t *testing.T) { output: "1 2", }, { - text: "{{ query \"1.5\" | first | value }}", - output: "1.5", - }, - { - // Get value from scalar query. - text: "{{ query \"scalar(count(metric))\" | first | value }}", - output: "2", + text: "{{ query \"1.5\" | first | value }}", + output: "1.5", + queryResult: promql.Vector{{Point: promql.Point{T: 0, V: 1.5}}}, }, { // Get value from query. - text: "{{ query \"metric{instance='a'}\" | first | value }}", + text: "{{ query \"metric{instance='a'}\" | first | value }}", + queryResult: promql.Vector{ + { + Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"), + Point: promql.Point{T: 0, V: 11}, + }}, output: "11", }, { // Get label from query. - text: "{{ query \"metric{instance='a'}\" | first | label \"instance\" }}", + text: "{{ query \"metric{instance='a'}\" | first | label \"instance\" }}", + + queryResult: promql.Vector{ + { + Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"), + Point: promql.Point{T: 0, V: 11}, + }}, output: "a", }, { // Missing label is empty when using label function. - text: "{{ query \"metric{instance='a'}\" | first | label \"foo\" }}", + text: "{{ query \"metric{instance='a'}\" | first | label \"foo\" }}", + queryResult: promql.Vector{ + { + Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"), + Point: promql.Point{T: 0, V: 11}, + }}, output: "", }, { // Missing label is empty when not using label function. - text: "{{ $x := query \"metric\" | first }}{{ $x.Labels.foo }}", + text: "{{ $x := query \"metric\" | first }}{{ $x.Labels.foo }}", + queryResult: promql.Vector{ + { + Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"), + Point: promql.Point{T: 0, V: 11}, + }}, output: "", }, { - text: "{{ $x := query \"metric\" | first }}{{ $x.Labels.foo }}", + text: "{{ $x := query \"metric\" | first }}{{ $x.Labels.foo }}", + queryResult: promql.Vector{ + { + Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"), + Point: promql.Point{T: 0, V: 11}, + }}, output: "", html: true, }, { // Range over query and sort by label. - text: "{{ range query \"metric\" | sortByLabel \"instance\" }}{{.Labels.instance}}:{{.Value}}: {{end}}", + text: "{{ range query \"metric\" | sortByLabel \"instance\" }}{{.Labels.instance}}:{{.Value}}: {{end}}", + queryResult: promql.Vector{ + { + Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"), + Point: promql.Point{T: 0, V: 11}, + }, { + Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "b"), + Point: promql.Point{T: 0, V: 21}, + }}, output: "a:11: b:21: ", }, { @@ -115,13 +143,15 @@ func TestTemplateExpansion(t *testing.T) { }, { // Error in function. - text: "{{ query \"missing\" | first }}", - shouldFail: true, + text: "{{ query \"missing\" | first }}", + queryResult: promql.Vector{}, + shouldFail: true, }, { // Panic. - text: "{{ (query \"missing\").banana }}", - shouldFail: true, + text: "{{ (query \"missing\").banana }}", + queryResult: promql.Vector{}, + shouldFail: true, }, { // Regex replacement. @@ -211,36 +241,18 @@ func TestTemplateExpansion(t *testing.T) { }, } - time := model.Time(0) - - storage := testutil.NewStorage(t) - defer storage.Close() - - app, err := storage.Appender() - if err != nil { - t.Fatalf("get appender: %s", err) - } - - _, err = app.Add(labels.FromStrings(labels.MetricName, "metric", "instance", "a"), 0, 11) - require.NoError(t, err) - _, err = app.Add(labels.FromStrings(labels.MetricName, "metric", "instance", "b"), 0, 21) - require.NoError(t, err) - - if err := app.Commit(); err != nil { - t.Fatalf("commit samples: %s", err) - } - - engine := promql.NewEngine(storage, nil) - extURL, err := url.Parse("http://testhost:9090/path/prefix") if err != nil { panic(err) } for i, s := range scenarios { + queryFunc := func(_ context.Context, _ string, _ time.Time) (promql.Vector, error) { + return s.queryResult, nil + } var result string var err error - expander := NewTemplateExpander(context.Background(), s.text, "test", s.input, time, engine, extURL) + expander := NewTemplateExpander(context.Background(), s.text, "test", s.input, 0, queryFunc, extURL) if s.html { result, err = expander.ExpandHTML(nil) } else { diff --git a/web/web.go b/web/web.go index 1ded9650e..3ad90dd69 100644 --- a/web/web.go +++ b/web/web.go @@ -527,7 +527,15 @@ func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) { Path: strings.TrimLeft(name, "/"), } - tmpl := template.NewTemplateExpander(h.context, string(text), "__console_"+name, data, h.now(), h.queryEngine, h.options.ExternalURL) + tmpl := template.NewTemplateExpander( + h.context, + string(text), + "__console_"+name, + data, + h.now(), + template.QueryFunc(rules.EngineQueryFunc(h.queryEngine)), + h.options.ExternalURL, + ) filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -732,7 +740,15 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter http.Error(w, err.Error(), http.StatusInternalServerError) } - tmpl := template.NewTemplateExpander(h.context, text, name, data, h.now(), h.queryEngine, h.options.ExternalURL) + tmpl := template.NewTemplateExpander( + h.context, + text, + name, + data, + h.now(), + template.QueryFunc(rules.EngineQueryFunc(h.queryEngine)), + h.options.ExternalURL, + ) tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options)) result, err := tmpl.ExpandHTML(nil) From bd9f7460eb6fac35a103036d8fc5cd63c42ddd63 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 23 Nov 2017 15:48:14 +0100 Subject: [PATCH 3/5] rules: remove config package dependency --- cmd/prometheus/main.go | 30 +++++++++++++----------------- rules/manager.go | 9 ++++----- rules/manager_test.go | 16 ++++------------ 3 files changed, 21 insertions(+), 34 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4b19c9b61..f696d739d 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -272,12 +272,14 @@ func main() { conntrack.DialWithTracing(), ) - reloadables := []Reloadable{ - remoteStorage, - targetManager, - ruleManager, - webHandler, - notifier, + reloaders := []func(cfg *config.Config) error{ + remoteStorage.ApplyConfig, + targetManager.ApplyConfig, + webHandler.ApplyConfig, + notifier.ApplyConfig, + func(cfg *config.Config) error { + return ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), cfg.RuleFiles) + }, } prometheus.MustRegister(configSuccess) @@ -330,11 +332,11 @@ func main() { for { select { case <-hup: - if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { + if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) } case rc := <-webHandler.Reload(): - if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { + if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) rc <- err } else { @@ -363,7 +365,7 @@ func main() { return nil } - if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { + if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil { return fmt.Errorf("Error loading config %s", err) } @@ -473,13 +475,7 @@ func main() { level.Info(logger).Log("msg", "See you next time!") } -// Reloadable things can change their internal state to match a new config -// and handle failure gracefully. -type Reloadable interface { - ApplyConfig(*config.Config) error -} - -func reloadConfig(filename string, logger log.Logger, rls ...Reloadable) (err error) { +func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) { level.Info(logger).Log("msg", "Loading configuration file", "filename", filename) defer func() { @@ -498,7 +494,7 @@ func reloadConfig(filename string, logger log.Logger, rls ...Reloadable) (err er failed := false for _, rl := range rls { - if err := rl.ApplyConfig(conf); err != nil { + if err := rl(conf); err != nil { level.Error(logger).Log("msg", "Failed to apply configuration", "err", err) failed = true } diff --git a/rules/manager.go b/rules/manager.go index 3d7107036..4c5ef9382 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -30,7 +30,6 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/rulefmt" @@ -505,15 +504,15 @@ func (m *Manager) Stop() { level.Info(m.logger).Log("msg", "Rule manager stopped") } -// ApplyConfig updates the rule manager's state as the config requires. If +// Update the rule manager's state as the config requires. If // loading the new rules failed the old rule set is restored. -func (m *Manager) ApplyConfig(conf *config.Config) error { +func (m *Manager) Update(interval time.Duration, paths []string) error { m.mtx.Lock() defer m.mtx.Unlock() // Get all rule files and load the groups they define. var files []string - for _, pat := range conf.RuleFiles { + for _, pat := range paths { fs, err := filepath.Glob(pat) if err != nil { // The only error can be a bad pattern. @@ -523,7 +522,7 @@ func (m *Manager) ApplyConfig(conf *config.Config) error { } // To be replaced with a configurable per-group interval. - groups, errs := m.loadGroups(time.Duration(conf.GlobalConfig.EvaluationInterval), files...) + groups, errs := m.loadGroups(interval, files...) if errs != nil { for _, e := range errs { level.Error(m.logger).Log("msg", "loading groups failed", "err", e) diff --git a/rules/manager_test.go b/rules/manager_test.go index b9cb2ba49..33ad84fe8 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -24,7 +24,6 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" @@ -292,17 +291,10 @@ func TestCopyState(t *testing.T) { testutil.Equals(t, oldGroup.rules[0], newGroup.rules[3]) } -func TestApplyConfig(t *testing.T) { +func TestUpdate(t *testing.T) { expected := map[string]labels.Labels{ - "test": labels.Labels{ - labels.Label{ - Name: "name", - Value: "value", - }, - }, + "test": labels.FromStrings("name", "value"), } - conf, err := config.LoadFile("../config/testdata/conf.good.yml") - testutil.Ok(t, err) ruleManager := NewManager(&ManagerOptions{ Appendable: nil, Notifier: nil, @@ -312,7 +304,7 @@ func TestApplyConfig(t *testing.T) { }) ruleManager.Run() - err = ruleManager.ApplyConfig(conf) + err := ruleManager.Update(0, nil) testutil.Ok(t, err) for _, g := range ruleManager.groups { g.seriesInPreviousEval = []map[string]labels.Labels{ @@ -320,7 +312,7 @@ func TestApplyConfig(t *testing.T) { } } - err = ruleManager.ApplyConfig(conf) + err = ruleManager.Update(0, nil) testutil.Ok(t, err) for _, g := range ruleManager.groups { for _, actual := range g.seriesInPreviousEval { From 4d964a0a0de828470f95681bd8a3e0313d6c6728 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 24 Nov 2017 08:22:57 +0100 Subject: [PATCH 4/5] rules: make glob expansion a concern of main --- cmd/prometheus/main.go | 10 ++++++++++ rules/manager.go | 14 +------------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f696d739d..01bf1710f 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -278,6 +278,16 @@ func main() { webHandler.ApplyConfig, notifier.ApplyConfig, func(cfg *config.Config) error { + // Get all rule files matching the configuration oaths. + var files []string + for _, pat := range cfg.RuleFiles { + fs, err := filepath.Glob(pat) + if err != nil { + // The only error can be a bad pattern. + return fmt.Errorf("error retrieving rule files for %s: %s", pat, err) + } + files = append(files, fs...) + } return ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), cfg.RuleFiles) }, } diff --git a/rules/manager.go b/rules/manager.go index 4c5ef9382..742d0eb40 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -19,7 +19,6 @@ import ( "fmt" "math" "net/url" - "path/filepath" "sort" "sync" "time" @@ -506,21 +505,10 @@ func (m *Manager) Stop() { // Update the rule manager's state as the config requires. If // loading the new rules failed the old rule set is restored. -func (m *Manager) Update(interval time.Duration, paths []string) error { +func (m *Manager) Update(interval time.Duration, files []string) error { m.mtx.Lock() defer m.mtx.Unlock() - // Get all rule files and load the groups they define. - var files []string - for _, pat := range paths { - fs, err := filepath.Glob(pat) - if err != nil { - // The only error can be a bad pattern. - return fmt.Errorf("error retrieving rule files for %s: %s", pat, err) - } - files = append(files, fs...) - } - // To be replaced with a configurable per-group interval. groups, errs := m.loadGroups(interval, files...) if errs != nil { From 62461379b75bba14a8fda503aa6de823c23e9c23 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 24 Nov 2017 08:59:05 +0100 Subject: [PATCH 5/5] rules: decouple notifier packages The dependency on the notifier packages caused a transitive dependency on discovery and with that all client libraries our service discovery uses. --- cmd/prometheus/main.go | 35 +++++++++++++++++++++++++++-- rules/alerting.go | 6 ++++- rules/manager.go | 51 +++++++++--------------------------------- rules/manager_test.go | 17 +++++++------- 4 files changed, 57 insertions(+), 52 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 01bf1710f..fc7951e5f 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -50,6 +50,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/tsdb" + "github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/web" ) @@ -236,8 +237,8 @@ func main() { ruleManager := rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, - Notifier: notifier, - Query: rules.EngineQueryFunc(queryEngine), + QueryFunc: rules.EngineQueryFunc(queryEngine), + NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), Context: ctx, ExternalURL: cfg.web.ExternalURL, Logger: log.With(logger, "component", "rule manager"), @@ -552,3 +553,33 @@ func computeExternalURL(u, listenAddr string) (*url.URL, error) { return eu, nil } + +// sendAlerts implements a the rules.NotifyFunc for a Notifier. +// It filters any non-firing alerts from the input. +func sendAlerts(n *notifier.Notifier, externalURL string) rules.NotifyFunc { + return func(ctx context.Context, expr string, alerts ...*rules.Alert) error { + var res []*notifier.Alert + + for _, alert := range alerts { + // Only send actually firing alerts. + if alert.State == rules.StatePending { + continue + } + a := ¬ifier.Alert{ + StartsAt: alert.FiredAt, + Labels: alert.Labels, + Annotations: alert.Annotations, + GeneratorURL: externalURL + strutil.TableLinkForExpression(expr), + } + if !alert.ResolvedAt.IsZero() { + a.EndsAt = alert.ResolvedAt + } + res = append(res, a) + } + + if len(alerts) > 0 { + n.Send(res...) + } + return nil + } +} diff --git a/rules/alerting.go b/rules/alerting.go index 3b8b61e9f..0a2fe7448 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -83,7 +83,9 @@ type Alert struct { Value float64 // The interval during which the condition of this alert held true. // ResolvedAt will be 0 to indicate a still active alert. - ActiveAt, ResolvedAt time.Time + ActiveAt time.Time + FiredAt time.Time + ResolvedAt time.Time } // An AlertingRule generates alerts from its vector expression. @@ -264,12 +266,14 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, if a.State != StateInactive { a.State = StateInactive a.ResolvedAt = ts + a.FiredAt = time.Time{} } continue } if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { a.State = StateFiring + a.FiredAt = ts } vec = append(vec, r.sample(a, ts)) diff --git a/rules/manager.go b/rules/manager.go index 742d0eb40..778bfe0c7 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -29,14 +29,12 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/util/strutil" ) // Constants for instrumentation. @@ -192,7 +190,7 @@ func (g *Group) File() string { return g.file } // Rules returns the group's rules. func (g *Group) Rules() []Rule { return g.rules } -func (g *Group) run() { +func (g *Group) run(ctx context.Context) { defer close(g.terminated) // Wait an initial amount to have consistently slotted intervals. @@ -206,7 +204,7 @@ func (g *Group) run() { iterationsScheduled.Inc() start := time.Now() - g.Eval(start) + g.Eval(ctx, start) iterationDuration.Observe(time.Since(start).Seconds()) g.SetEvaluationTime(time.Since(start)) @@ -328,7 +326,7 @@ func typeForRule(r Rule) ruleType { } // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. -func (g *Group) Eval(ts time.Time) { +func (g *Group) Eval(ctx context.Context, ts time.Time) { for i, rule := range g.rules { select { case <-g.done: @@ -346,7 +344,7 @@ func (g *Group) Eval(ts time.Time) { evalTotal.WithLabelValues(rtyp).Inc() - vector, err := rule.Eval(g.opts.Context, ts, g.opts.Query, g.opts.ExternalURL) + vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL) if err != nil { // Canceled queries are intentional termination of queries. This normally // happens on shutdown and thus we skip logging of any errors here. @@ -358,7 +356,7 @@ func (g *Group) Eval(ts time.Time) { } if ar, ok := rule.(*AlertingRule); ok { - g.sendAlerts(ar) + g.opts.NotifyFunc(ctx, ar.vector.String(), ar.currentAlerts()...) } var ( numOutOfOrder = 0 @@ -418,36 +416,6 @@ func (g *Group) Eval(ts time.Time) { } } -// sendAlerts sends alert notifications for the given rule. -func (g *Group) sendAlerts(rule *AlertingRule) error { - var alerts []*notifier.Alert - - for _, alert := range rule.currentAlerts() { - // Only send actually firing alerts. - if alert.State == StatePending { - continue - } - - a := ¬ifier.Alert{ - StartsAt: alert.ActiveAt.Add(rule.holdDuration), - Labels: alert.Labels, - Annotations: alert.Annotations, - GeneratorURL: g.opts.ExternalURL.String() + strutil.TableLinkForExpression(rule.vector.String()), - } - if !alert.ResolvedAt.IsZero() { - a.EndsAt = alert.ResolvedAt - } - - alerts = append(alerts, a) - } - - if len(alerts) > 0 { - g.opts.Notifier.Send(alerts...) - } - - return nil -} - // The Manager manages recording and alerting rules. type Manager struct { opts *ManagerOptions @@ -463,12 +431,15 @@ type Appendable interface { Appender() (storage.Appender, error) } +// NotifyFunc sends notifications about a set of alerts generated by the given expression. +type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) error + // ManagerOptions bundles options for the Manager. type ManagerOptions struct { ExternalURL *url.URL - Query QueryFunc + QueryFunc QueryFunc + NotifyFunc NotifyFunc Context context.Context - Notifier *notifier.Notifier Appendable Appendable Logger log.Logger } @@ -539,7 +510,7 @@ func (m *Manager) Update(interval time.Duration, files []string) error { // is told to run. This is necessary to avoid running // queries against a bootstrapping storage. <-m.block - newg.run() + newg.run(m.opts.Context) }() wg.Done() }(newg) diff --git a/rules/manager_test.go b/rules/manager_test.go index 33ad84fe8..30e38e922 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -176,7 +176,7 @@ func TestStaleness(t *testing.T) { defer storage.Close() engine := promql.NewEngine(storage, nil) opts := &ManagerOptions{ - Query: EngineQueryFunc(engine), + QueryFunc: EngineQueryFunc(engine), Appendable: storage, Context: context.Background(), Logger: log.NewNopLogger(), @@ -196,10 +196,12 @@ func TestStaleness(t *testing.T) { err = app.Commit() testutil.Ok(t, err) + ctx := context.Background() + // Execute 3 times, 1 second apart. - group.Eval(time.Unix(0, 0)) - group.Eval(time.Unix(1, 0)) - group.Eval(time.Unix(2, 0)) + group.Eval(ctx, time.Unix(0, 0)) + group.Eval(ctx, time.Unix(1, 0)) + group.Eval(ctx, time.Unix(2, 0)) querier, err := storage.Querier(context.Background(), 0, 2000) testutil.Ok(t, err) @@ -296,11 +298,8 @@ func TestUpdate(t *testing.T) { "test": labels.FromStrings("name", "value"), } ruleManager := NewManager(&ManagerOptions{ - Appendable: nil, - Notifier: nil, - Query: nil, - Context: context.Background(), - Logger: log.NewNopLogger(), + Context: context.Background(), + Logger: log.NewNopLogger(), }) ruleManager.Run()