mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
Merge pull request #3507 from prometheus/rulepromql
Make PromQL metrics un-global, remove rules dep on promql.Engine
This commit is contained in:
commit
b3ff5f6b0e
|
@ -50,6 +50,7 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
"github.com/prometheus/prometheus/storage/tsdb"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
"github.com/prometheus/prometheus/web"
|
||||
)
|
||||
|
||||
|
@ -96,6 +97,9 @@ func main() {
|
|||
notifier: notifier.Options{
|
||||
Registerer: prometheus.DefaultRegisterer,
|
||||
},
|
||||
queryEngine: promql.EngineOptions{
|
||||
Metrics: prometheus.DefaultRegisterer,
|
||||
},
|
||||
}
|
||||
|
||||
a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server")
|
||||
|
@ -233,8 +237,8 @@ func main() {
|
|||
|
||||
ruleManager := rules.NewManager(&rules.ManagerOptions{
|
||||
Appendable: fanoutStorage,
|
||||
Notifier: notifier,
|
||||
QueryEngine: queryEngine,
|
||||
QueryFunc: rules.EngineQueryFunc(queryEngine),
|
||||
NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
|
||||
Context: ctx,
|
||||
ExternalURL: cfg.web.ExternalURL,
|
||||
Logger: log.With(logger, "component", "rule manager"),
|
||||
|
@ -269,12 +273,24 @@ func main() {
|
|||
conntrack.DialWithTracing(),
|
||||
)
|
||||
|
||||
reloadables := []Reloadable{
|
||||
remoteStorage,
|
||||
targetManager,
|
||||
ruleManager,
|
||||
webHandler,
|
||||
notifier,
|
||||
reloaders := []func(cfg *config.Config) error{
|
||||
remoteStorage.ApplyConfig,
|
||||
targetManager.ApplyConfig,
|
||||
webHandler.ApplyConfig,
|
||||
notifier.ApplyConfig,
|
||||
func(cfg *config.Config) error {
|
||||
// Get all rule files matching the configuration oaths.
|
||||
var files []string
|
||||
for _, pat := range cfg.RuleFiles {
|
||||
fs, err := filepath.Glob(pat)
|
||||
if err != nil {
|
||||
// The only error can be a bad pattern.
|
||||
return fmt.Errorf("error retrieving rule files for %s: %s", pat, err)
|
||||
}
|
||||
files = append(files, fs...)
|
||||
}
|
||||
return ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), cfg.RuleFiles)
|
||||
},
|
||||
}
|
||||
|
||||
prometheus.MustRegister(configSuccess)
|
||||
|
@ -327,11 +343,11 @@ func main() {
|
|||
for {
|
||||
select {
|
||||
case <-hup:
|
||||
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
|
||||
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
|
||||
level.Error(logger).Log("msg", "Error reloading config", "err", err)
|
||||
}
|
||||
case rc := <-webHandler.Reload():
|
||||
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
|
||||
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
|
||||
level.Error(logger).Log("msg", "Error reloading config", "err", err)
|
||||
rc <- err
|
||||
} else {
|
||||
|
@ -360,7 +376,7 @@ func main() {
|
|||
return nil
|
||||
}
|
||||
|
||||
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
|
||||
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
|
||||
return fmt.Errorf("Error loading config %s", err)
|
||||
}
|
||||
|
||||
|
@ -470,13 +486,7 @@ func main() {
|
|||
level.Info(logger).Log("msg", "See you next time!")
|
||||
}
|
||||
|
||||
// Reloadable things can change their internal state to match a new config
|
||||
// and handle failure gracefully.
|
||||
type Reloadable interface {
|
||||
ApplyConfig(*config.Config) error
|
||||
}
|
||||
|
||||
func reloadConfig(filename string, logger log.Logger, rls ...Reloadable) (err error) {
|
||||
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
|
||||
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
|
||||
|
||||
defer func() {
|
||||
|
@ -495,7 +505,7 @@ func reloadConfig(filename string, logger log.Logger, rls ...Reloadable) (err er
|
|||
|
||||
failed := false
|
||||
for _, rl := range rls {
|
||||
if err := rl.ApplyConfig(conf); err != nil {
|
||||
if err := rl(conf); err != nil {
|
||||
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
|
||||
failed = true
|
||||
}
|
||||
|
@ -543,3 +553,33 @@ func computeExternalURL(u, listenAddr string) (*url.URL, error) {
|
|||
|
||||
return eu, nil
|
||||
}
|
||||
|
||||
// sendAlerts implements a the rules.NotifyFunc for a Notifier.
|
||||
// It filters any non-firing alerts from the input.
|
||||
func sendAlerts(n *notifier.Notifier, externalURL string) rules.NotifyFunc {
|
||||
return func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
|
||||
var res []*notifier.Alert
|
||||
|
||||
for _, alert := range alerts {
|
||||
// Only send actually firing alerts.
|
||||
if alert.State == rules.StatePending {
|
||||
continue
|
||||
}
|
||||
a := ¬ifier.Alert{
|
||||
StartsAt: alert.FiredAt,
|
||||
Labels: alert.Labels,
|
||||
Annotations: alert.Annotations,
|
||||
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
|
||||
}
|
||||
if !alert.ResolvedAt.IsZero() {
|
||||
a.EndsAt = alert.ResolvedAt
|
||||
}
|
||||
res = append(res, a)
|
||||
}
|
||||
|
||||
if len(alerts) > 0 {
|
||||
n.Send(res...)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
137
promql/engine.go
137
promql/engine.go
|
@ -47,64 +47,13 @@ const (
|
|||
minInt64 = -9223372036854775808
|
||||
)
|
||||
|
||||
var (
|
||||
currentQueries = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "queries",
|
||||
Help: "The current number of queries being executed or waiting.",
|
||||
})
|
||||
maxConcurrentQueries = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "queries_concurrent_max",
|
||||
Help: "The max number of concurrent queries.",
|
||||
})
|
||||
queryPrepareTime = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "query_duration_seconds",
|
||||
Help: "Query timings",
|
||||
ConstLabels: prometheus.Labels{"slice": "prepare_time"},
|
||||
},
|
||||
)
|
||||
queryInnerEval = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "query_duration_seconds",
|
||||
Help: "Query timings",
|
||||
ConstLabels: prometheus.Labels{"slice": "inner_eval"},
|
||||
},
|
||||
)
|
||||
queryResultAppend = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "query_duration_seconds",
|
||||
Help: "Query timings",
|
||||
ConstLabels: prometheus.Labels{"slice": "result_append"},
|
||||
},
|
||||
)
|
||||
queryResultSort = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "query_duration_seconds",
|
||||
Help: "Query timings",
|
||||
ConstLabels: prometheus.Labels{"slice": "result_sort"},
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(currentQueries)
|
||||
prometheus.MustRegister(maxConcurrentQueries)
|
||||
prometheus.MustRegister(queryPrepareTime)
|
||||
prometheus.MustRegister(queryInnerEval)
|
||||
prometheus.MustRegister(queryResultAppend)
|
||||
prometheus.MustRegister(queryResultSort)
|
||||
type engineMetrics struct {
|
||||
currentQueries prometheus.Gauge
|
||||
maxConcurrentQueries prometheus.Gauge
|
||||
queryPrepareTime prometheus.Summary
|
||||
queryInnerEval prometheus.Summary
|
||||
queryResultAppend prometheus.Summary
|
||||
queryResultSort prometheus.Summary
|
||||
}
|
||||
|
||||
// convertibleToInt64 returns true if v does not over-/underflow an int64.
|
||||
|
@ -203,6 +152,7 @@ func contextDone(ctx context.Context, env string) error {
|
|||
type Engine struct {
|
||||
// A Querier constructor against an underlying storage.
|
||||
queryable Queryable
|
||||
metrics *engineMetrics
|
||||
// The gate limiting the maximum number of concurrent and waiting queries.
|
||||
gate *queryGate
|
||||
options *EngineOptions
|
||||
|
@ -220,12 +170,66 @@ func NewEngine(queryable Queryable, o *EngineOptions) *Engine {
|
|||
if o == nil {
|
||||
o = DefaultEngineOptions
|
||||
}
|
||||
maxConcurrentQueries.Set(float64(o.MaxConcurrentQueries))
|
||||
metrics := &engineMetrics{
|
||||
currentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "queries",
|
||||
Help: "The current number of queries being executed or waiting.",
|
||||
}),
|
||||
maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "queries_concurrent_max",
|
||||
Help: "The max number of concurrent queries.",
|
||||
}),
|
||||
queryPrepareTime: prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "query_duration_seconds",
|
||||
Help: "Query timings",
|
||||
ConstLabels: prometheus.Labels{"slice": "prepare_time"},
|
||||
}),
|
||||
queryInnerEval: prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "query_duration_seconds",
|
||||
Help: "Query timings",
|
||||
ConstLabels: prometheus.Labels{"slice": "inner_eval"},
|
||||
}),
|
||||
queryResultAppend: prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "query_duration_seconds",
|
||||
Help: "Query timings",
|
||||
ConstLabels: prometheus.Labels{"slice": "result_append"},
|
||||
}),
|
||||
queryResultSort: prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "query_duration_seconds",
|
||||
Help: "Query timings",
|
||||
ConstLabels: prometheus.Labels{"slice": "result_sort"},
|
||||
}),
|
||||
}
|
||||
metrics.maxConcurrentQueries.Set(float64(o.MaxConcurrentQueries))
|
||||
|
||||
if o.Metrics != nil {
|
||||
o.Metrics.MustRegister(
|
||||
metrics.currentQueries,
|
||||
metrics.maxConcurrentQueries,
|
||||
metrics.queryInnerEval,
|
||||
metrics.queryPrepareTime,
|
||||
metrics.queryResultAppend,
|
||||
metrics.queryResultSort,
|
||||
)
|
||||
}
|
||||
return &Engine{
|
||||
queryable: queryable,
|
||||
gate: newQueryGate(o.MaxConcurrentQueries),
|
||||
options: o,
|
||||
logger: o.Logger,
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -234,6 +238,7 @@ type EngineOptions struct {
|
|||
MaxConcurrentQueries int
|
||||
Timeout time.Duration
|
||||
Logger log.Logger
|
||||
Metrics prometheus.Registerer
|
||||
}
|
||||
|
||||
// DefaultEngineOptions are the default engine options.
|
||||
|
@ -308,8 +313,8 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
|
|||
// At this point per query only one EvalStmt is evaluated. Alert and record
|
||||
// statements are not handled by the Engine.
|
||||
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
|
||||
currentQueries.Inc()
|
||||
defer currentQueries.Dec()
|
||||
ng.metrics.currentQueries.Inc()
|
||||
defer ng.metrics.currentQueries.Dec()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, ng.options.Timeout)
|
||||
q.cancel = cancel
|
||||
|
@ -362,7 +367,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start()
|
||||
querier, err := ng.populateIterators(ctx, s)
|
||||
prepareTimer.Stop()
|
||||
queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds())
|
||||
ng.metrics.queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds())
|
||||
|
||||
// XXX(fabxc): the querier returned by populateIterators might be instantiated
|
||||
// we must not return without closing irrespective of the error.
|
||||
|
@ -390,7 +395,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
}
|
||||
|
||||
evalTimer.Stop()
|
||||
queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds())
|
||||
ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds())
|
||||
// Point might have a different timestamp, force it to the evaluation
|
||||
// timestamp as that is when we ran the evaluation.
|
||||
switch v := val.(type) {
|
||||
|
@ -456,7 +461,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
}
|
||||
}
|
||||
evalTimer.Stop()
|
||||
queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds())
|
||||
ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds())
|
||||
|
||||
if err := contextDone(ctx, "expression evaluation"); err != nil {
|
||||
return nil, err
|
||||
|
@ -468,7 +473,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
mat = append(mat, ss)
|
||||
}
|
||||
appendTimer.Stop()
|
||||
queryResultAppend.Observe(appendTimer.ElapsedTime().Seconds())
|
||||
ng.metrics.queryResultAppend.Observe(appendTimer.ElapsedTime().Seconds())
|
||||
|
||||
if err := contextDone(ctx, "expression evaluation"); err != nil {
|
||||
return nil, err
|
||||
|
@ -480,7 +485,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
sort.Sort(mat)
|
||||
sortTimer.Stop()
|
||||
|
||||
queryResultSort.Observe(sortTimer.ElapsedTime().Seconds())
|
||||
ng.metrics.queryResultSort.Observe(sortTimer.ElapsedTime().Seconds())
|
||||
return mat, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -83,7 +83,9 @@ type Alert struct {
|
|||
Value float64
|
||||
// The interval during which the condition of this alert held true.
|
||||
// ResolvedAt will be 0 to indicate a still active alert.
|
||||
ActiveAt, ResolvedAt time.Time
|
||||
ActiveAt time.Time
|
||||
FiredAt time.Time
|
||||
ResolvedAt time.Time
|
||||
}
|
||||
|
||||
// An AlertingRule generates alerts from its vector expression.
|
||||
|
@ -171,12 +173,8 @@ const resolvedRetention = 15 * time.Minute
|
|||
|
||||
// Eval evaluates the rule expression and then creates pending alerts and fires
|
||||
// or removes previously pending alerts accordingly.
|
||||
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, engine *promql.Engine, externalURL *url.URL) (promql.Vector, error) {
|
||||
query, err := engine.NewInstantQuery(r.vector.String(), ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := query.Exec(ctx).Vector()
|
||||
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
|
||||
res, err := query(ctx, r.vector.String(), ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -213,7 +211,7 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, engine *promql.En
|
|||
"__alert_"+r.Name(),
|
||||
tmplData,
|
||||
model.Time(timestamp.FromTime(ts)),
|
||||
engine,
|
||||
template.QueryFunc(query),
|
||||
externalURL,
|
||||
)
|
||||
result, err := tmpl.Expand()
|
||||
|
@ -268,12 +266,14 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, engine *promql.En
|
|||
if a.State != StateInactive {
|
||||
a.State = StateInactive
|
||||
a.ResolvedAt = ts
|
||||
a.FiredAt = time.Time{}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
|
||||
a.State = StateFiring
|
||||
a.FiredAt = ts
|
||||
}
|
||||
|
||||
vec = append(vec, r.sample(a, ts))
|
||||
|
|
103
rules/manager.go
103
rules/manager.go
|
@ -19,7 +19,6 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -30,15 +29,12 @@ import (
|
|||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/notifier"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/rulefmt"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/pkg/value"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
)
|
||||
|
||||
// Constants for instrumentation.
|
||||
|
@ -107,12 +103,42 @@ const (
|
|||
ruleTypeRecording = "recording"
|
||||
)
|
||||
|
||||
// QueryFunc processes PromQL queries.
|
||||
type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, error)
|
||||
|
||||
// EngineQueryFunc returns a new query function that executes instant queries against
|
||||
// the given engine.
|
||||
// It converts scaler into vector results.
|
||||
func EngineQueryFunc(engine *promql.Engine) QueryFunc {
|
||||
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
|
||||
q, err := engine.NewInstantQuery(qs, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := q.Exec(ctx)
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
switch v := res.Value.(type) {
|
||||
case promql.Vector:
|
||||
return v, nil
|
||||
case promql.Scalar:
|
||||
return promql.Vector{promql.Sample{
|
||||
Point: promql.Point(v),
|
||||
Metric: labels.Labels{},
|
||||
}}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("rule result is not a vector or scalar")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A Rule encapsulates a vector expression which is evaluated at a specified
|
||||
// interval and acted upon (currently either recorded or used for alerting).
|
||||
type Rule interface {
|
||||
Name() string
|
||||
// eval evaluates the rule, including any associated recording or alerting actions.
|
||||
Eval(context.Context, time.Time, *promql.Engine, *url.URL) (promql.Vector, error)
|
||||
Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error)
|
||||
// String returns a human-readable string representation of the rule.
|
||||
String() string
|
||||
|
||||
|
@ -164,7 +190,7 @@ func (g *Group) File() string { return g.file }
|
|||
// Rules returns the group's rules.
|
||||
func (g *Group) Rules() []Rule { return g.rules }
|
||||
|
||||
func (g *Group) run() {
|
||||
func (g *Group) run(ctx context.Context) {
|
||||
defer close(g.terminated)
|
||||
|
||||
// Wait an initial amount to have consistently slotted intervals.
|
||||
|
@ -178,7 +204,7 @@ func (g *Group) run() {
|
|||
iterationsScheduled.Inc()
|
||||
|
||||
start := time.Now()
|
||||
g.Eval(start)
|
||||
g.Eval(ctx, start)
|
||||
|
||||
iterationDuration.Observe(time.Since(start).Seconds())
|
||||
g.SetEvaluationTime(time.Since(start))
|
||||
|
@ -220,7 +246,6 @@ func (g *Group) hash() uint64 {
|
|||
labels.Label{"name", g.name},
|
||||
labels.Label{"file", g.file},
|
||||
)
|
||||
|
||||
return l.Hash()
|
||||
}
|
||||
|
||||
|
@ -301,7 +326,7 @@ func typeForRule(r Rule) ruleType {
|
|||
}
|
||||
|
||||
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
|
||||
func (g *Group) Eval(ts time.Time) {
|
||||
func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
||||
for i, rule := range g.rules {
|
||||
select {
|
||||
case <-g.done:
|
||||
|
@ -319,7 +344,7 @@ func (g *Group) Eval(ts time.Time) {
|
|||
|
||||
evalTotal.WithLabelValues(rtyp).Inc()
|
||||
|
||||
vector, err := rule.Eval(g.opts.Context, ts, g.opts.QueryEngine, g.opts.ExternalURL)
|
||||
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
|
||||
if err != nil {
|
||||
// Canceled queries are intentional termination of queries. This normally
|
||||
// happens on shutdown and thus we skip logging of any errors here.
|
||||
|
@ -331,7 +356,7 @@ func (g *Group) Eval(ts time.Time) {
|
|||
}
|
||||
|
||||
if ar, ok := rule.(*AlertingRule); ok {
|
||||
g.sendAlerts(ar)
|
||||
g.opts.NotifyFunc(ctx, ar.vector.String(), ar.currentAlerts()...)
|
||||
}
|
||||
var (
|
||||
numOutOfOrder = 0
|
||||
|
@ -391,36 +416,6 @@ func (g *Group) Eval(ts time.Time) {
|
|||
}
|
||||
}
|
||||
|
||||
// sendAlerts sends alert notifications for the given rule.
|
||||
func (g *Group) sendAlerts(rule *AlertingRule) error {
|
||||
var alerts []*notifier.Alert
|
||||
|
||||
for _, alert := range rule.currentAlerts() {
|
||||
// Only send actually firing alerts.
|
||||
if alert.State == StatePending {
|
||||
continue
|
||||
}
|
||||
|
||||
a := ¬ifier.Alert{
|
||||
StartsAt: alert.ActiveAt.Add(rule.holdDuration),
|
||||
Labels: alert.Labels,
|
||||
Annotations: alert.Annotations,
|
||||
GeneratorURL: g.opts.ExternalURL.String() + strutil.TableLinkForExpression(rule.vector.String()),
|
||||
}
|
||||
if !alert.ResolvedAt.IsZero() {
|
||||
a.EndsAt = alert.ResolvedAt
|
||||
}
|
||||
|
||||
alerts = append(alerts, a)
|
||||
}
|
||||
|
||||
if len(alerts) > 0 {
|
||||
g.opts.Notifier.Send(alerts...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// The Manager manages recording and alerting rules.
|
||||
type Manager struct {
|
||||
opts *ManagerOptions
|
||||
|
@ -436,12 +431,15 @@ type Appendable interface {
|
|||
Appender() (storage.Appender, error)
|
||||
}
|
||||
|
||||
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
|
||||
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) error
|
||||
|
||||
// ManagerOptions bundles options for the Manager.
|
||||
type ManagerOptions struct {
|
||||
ExternalURL *url.URL
|
||||
QueryEngine *promql.Engine
|
||||
QueryFunc QueryFunc
|
||||
NotifyFunc NotifyFunc
|
||||
Context context.Context
|
||||
Notifier *notifier.Notifier
|
||||
Appendable Appendable
|
||||
Logger log.Logger
|
||||
}
|
||||
|
@ -476,25 +474,14 @@ func (m *Manager) Stop() {
|
|||
level.Info(m.logger).Log("msg", "Rule manager stopped")
|
||||
}
|
||||
|
||||
// ApplyConfig updates the rule manager's state as the config requires. If
|
||||
// Update the rule manager's state as the config requires. If
|
||||
// loading the new rules failed the old rule set is restored.
|
||||
func (m *Manager) ApplyConfig(conf *config.Config) error {
|
||||
func (m *Manager) Update(interval time.Duration, files []string) error {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
// Get all rule files and load the groups they define.
|
||||
var files []string
|
||||
for _, pat := range conf.RuleFiles {
|
||||
fs, err := filepath.Glob(pat)
|
||||
if err != nil {
|
||||
// The only error can be a bad pattern.
|
||||
return fmt.Errorf("error retrieving rule files for %s: %s", pat, err)
|
||||
}
|
||||
files = append(files, fs...)
|
||||
}
|
||||
|
||||
// To be replaced with a configurable per-group interval.
|
||||
groups, errs := m.loadGroups(time.Duration(conf.GlobalConfig.EvaluationInterval), files...)
|
||||
groups, errs := m.loadGroups(interval, files...)
|
||||
if errs != nil {
|
||||
for _, e := range errs {
|
||||
level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
|
||||
|
@ -523,7 +510,7 @@ func (m *Manager) ApplyConfig(conf *config.Config) error {
|
|||
// is told to run. This is necessary to avoid running
|
||||
// queries against a bootstrapping storage.
|
||||
<-m.block
|
||||
newg.run()
|
||||
newg.run(m.opts.Context)
|
||||
}()
|
||||
wg.Done()
|
||||
}(newg)
|
||||
|
|
|
@ -17,14 +17,13 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/pkg/value"
|
||||
|
@ -55,75 +54,108 @@ func TestAlertingRule(t *testing.T) {
|
|||
labels.FromStrings("severity", "{{\"c\"}}ritical"),
|
||||
nil, nil,
|
||||
)
|
||||
result := promql.Vector{
|
||||
{
|
||||
Metric: labels.FromStrings(
|
||||
"__name__", "ALERTS",
|
||||
"alertname", "HTTPRequestRateLow",
|
||||
"alertstate", "pending",
|
||||
"group", "canary",
|
||||
"instance", "0",
|
||||
"job", "app-server",
|
||||
"severity", "critical",
|
||||
),
|
||||
Point: promql.Point{V: 1},
|
||||
},
|
||||
{
|
||||
Metric: labels.FromStrings(
|
||||
"__name__", "ALERTS",
|
||||
"alertname", "HTTPRequestRateLow",
|
||||
"alertstate", "pending",
|
||||
"group", "canary",
|
||||
"instance", "1",
|
||||
"job", "app-server",
|
||||
"severity", "critical",
|
||||
),
|
||||
Point: promql.Point{V: 1},
|
||||
},
|
||||
{
|
||||
Metric: labels.FromStrings(
|
||||
"__name__", "ALERTS",
|
||||
"alertname", "HTTPRequestRateLow",
|
||||
"alertstate", "firing",
|
||||
"group", "canary",
|
||||
"instance", "0",
|
||||
"job", "app-server",
|
||||
"severity", "critical",
|
||||
),
|
||||
Point: promql.Point{V: 1},
|
||||
},
|
||||
{
|
||||
Metric: labels.FromStrings(
|
||||
"__name__", "ALERTS",
|
||||
"alertname", "HTTPRequestRateLow",
|
||||
"alertstate", "firing",
|
||||
"group", "canary",
|
||||
"instance", "1",
|
||||
"job", "app-server",
|
||||
"severity", "critical",
|
||||
),
|
||||
Point: promql.Point{V: 1},
|
||||
},
|
||||
}
|
||||
|
||||
baseTime := time.Unix(0, 0)
|
||||
|
||||
var tests = []struct {
|
||||
time time.Duration
|
||||
result []string
|
||||
result promql.Vector
|
||||
}{
|
||||
{
|
||||
time: 0,
|
||||
result: []string{
|
||||
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
|
||||
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="1", job="app-server", severity="critical"} => 1 @[%v]`,
|
||||
},
|
||||
time: 0,
|
||||
result: result[:2],
|
||||
}, {
|
||||
time: 5 * time.Minute,
|
||||
result: []string{
|
||||
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
|
||||
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="1", job="app-server", severity="critical"} => 1 @[%v]`,
|
||||
},
|
||||
time: 5 * time.Minute,
|
||||
result: result[2:],
|
||||
}, {
|
||||
time: 10 * time.Minute,
|
||||
result: []string{
|
||||
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
|
||||
},
|
||||
time: 10 * time.Minute,
|
||||
result: result[2:3],
|
||||
},
|
||||
{
|
||||
time: 15 * time.Minute,
|
||||
result: []string{},
|
||||
result: nil,
|
||||
},
|
||||
{
|
||||
time: 20 * time.Minute,
|
||||
result: []string{},
|
||||
result: nil,
|
||||
},
|
||||
{
|
||||
time: 25 * time.Minute,
|
||||
result: []string{
|
||||
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
|
||||
},
|
||||
time: 25 * time.Minute,
|
||||
result: result[:1],
|
||||
},
|
||||
{
|
||||
time: 30 * time.Minute,
|
||||
result: []string{
|
||||
`{__name__="ALERTS", alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
|
||||
},
|
||||
time: 30 * time.Minute,
|
||||
result: result[2:3],
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
t.Logf("case %d", i)
|
||||
|
||||
evalTime := baseTime.Add(test.time)
|
||||
|
||||
res, err := rule.Eval(suite.Context(), evalTime, suite.QueryEngine(), nil)
|
||||
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine()), nil)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
actual := strings.Split(res.String(), "\n")
|
||||
expected := annotateWithTime(test.result, evalTime)
|
||||
if actual[0] == "" {
|
||||
actual = []string{}
|
||||
for i := range test.result {
|
||||
test.result[i].T = timestamp.FromTime(evalTime)
|
||||
}
|
||||
testutil.Assert(t, len(expected) == len(actual), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(expected), len(actual))
|
||||
testutil.Assert(t, len(test.result) == len(res), "%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(test.result), len(res))
|
||||
|
||||
for j, expectedSample := range expected {
|
||||
found := false
|
||||
for _, actualSample := range actual {
|
||||
if actualSample == expectedSample {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
testutil.Assert(t, found, "%d.%d. Couldn't find expected sample in output: '%v'", i, j, expectedSample)
|
||||
}
|
||||
sort.Slice(res, func(i, j int) bool {
|
||||
return labels.Compare(res[i].Metric, res[j].Metric) < 0
|
||||
})
|
||||
testutil.Equals(t, test.result, res)
|
||||
|
||||
for _, aa := range rule.ActiveAlerts() {
|
||||
testutil.Assert(t, aa.Labels.Get(model.MetricNameLabel) == "", "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
|
||||
|
@ -144,10 +176,10 @@ func TestStaleness(t *testing.T) {
|
|||
defer storage.Close()
|
||||
engine := promql.NewEngine(storage, nil)
|
||||
opts := &ManagerOptions{
|
||||
QueryEngine: engine,
|
||||
Appendable: storage,
|
||||
Context: context.Background(),
|
||||
Logger: log.NewNopLogger(),
|
||||
QueryFunc: EngineQueryFunc(engine),
|
||||
Appendable: storage,
|
||||
Context: context.Background(),
|
||||
Logger: log.NewNopLogger(),
|
||||
}
|
||||
|
||||
expr, err := promql.ParseExpr("a + 1")
|
||||
|
@ -164,10 +196,12 @@ func TestStaleness(t *testing.T) {
|
|||
err = app.Commit()
|
||||
testutil.Ok(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Execute 3 times, 1 second apart.
|
||||
group.Eval(time.Unix(0, 0))
|
||||
group.Eval(time.Unix(1, 0))
|
||||
group.Eval(time.Unix(2, 0))
|
||||
group.Eval(ctx, time.Unix(0, 0))
|
||||
group.Eval(ctx, time.Unix(1, 0))
|
||||
group.Eval(ctx, time.Unix(2, 0))
|
||||
|
||||
querier, err := storage.Querier(context.Background(), 0, 2000)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -259,27 +293,17 @@ func TestCopyState(t *testing.T) {
|
|||
testutil.Equals(t, oldGroup.rules[0], newGroup.rules[3])
|
||||
}
|
||||
|
||||
func TestApplyConfig(t *testing.T) {
|
||||
func TestUpdate(t *testing.T) {
|
||||
expected := map[string]labels.Labels{
|
||||
"test": labels.Labels{
|
||||
labels.Label{
|
||||
Name: "name",
|
||||
Value: "value",
|
||||
},
|
||||
},
|
||||
"test": labels.FromStrings("name", "value"),
|
||||
}
|
||||
conf, err := config.LoadFile("../config/testdata/conf.good.yml")
|
||||
testutil.Ok(t, err)
|
||||
ruleManager := NewManager(&ManagerOptions{
|
||||
Appendable: nil,
|
||||
Notifier: nil,
|
||||
QueryEngine: nil,
|
||||
Context: context.Background(),
|
||||
Logger: log.NewNopLogger(),
|
||||
Context: context.Background(),
|
||||
Logger: log.NewNopLogger(),
|
||||
})
|
||||
ruleManager.Run()
|
||||
|
||||
err = ruleManager.ApplyConfig(conf)
|
||||
err := ruleManager.Update(0, nil)
|
||||
testutil.Ok(t, err)
|
||||
for _, g := range ruleManager.groups {
|
||||
g.seriesInPreviousEval = []map[string]labels.Labels{
|
||||
|
@ -287,7 +311,7 @@ func TestApplyConfig(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
err = ruleManager.ApplyConfig(conf)
|
||||
err = ruleManager.Update(0, nil)
|
||||
testutil.Ok(t, err)
|
||||
for _, g := range ruleManager.groups {
|
||||
for _, actual := range g.seriesInPreviousEval {
|
||||
|
|
|
@ -53,32 +53,11 @@ func (rule *RecordingRule) Name() string {
|
|||
}
|
||||
|
||||
// Eval evaluates the rule and then overrides the metric names and labels accordingly.
|
||||
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, engine *promql.Engine, _ *url.URL) (promql.Vector, error) {
|
||||
query, err := engine.NewInstantQuery(rule.vector.String(), ts)
|
||||
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) {
|
||||
vector, err := query(ctx, rule.vector.String(), ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
result = query.Exec(ctx)
|
||||
vector promql.Vector
|
||||
)
|
||||
if result.Err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch v := result.Value.(type) {
|
||||
case promql.Vector:
|
||||
vector = v
|
||||
case promql.Scalar:
|
||||
vector = promql.Vector{promql.Sample{
|
||||
Point: promql.Point(v),
|
||||
Metric: labels.Labels{},
|
||||
}}
|
||||
default:
|
||||
return nil, fmt.Errorf("rule result is not a vector or scalar")
|
||||
}
|
||||
|
||||
// Override the metric name and labels.
|
||||
for i := range vector {
|
||||
sample := &vector[i]
|
||||
|
|
|
@ -62,7 +62,7 @@ func TestRuleEval(t *testing.T) {
|
|||
|
||||
for _, test := range suite {
|
||||
rule := NewRecordingRule(test.name, test.expr, test.labels)
|
||||
result, err := rule.Eval(ctx, now, engine, nil)
|
||||
result, err := rule.Eval(ctx, now, EngineQueryFunc(engine), nil)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, result, test.result)
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import (
|
|||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
)
|
||||
|
@ -59,34 +58,14 @@ func (q queryResultByLabelSorter) Swap(i, j int) {
|
|||
q.results[i], q.results[j] = q.results[j], q.results[i]
|
||||
}
|
||||
|
||||
func query(ctx context.Context, q string, ts time.Time, queryEngine *promql.Engine) (queryResult, error) {
|
||||
query, err := queryEngine.NewInstantQuery(q, ts)
|
||||
// QueryFunc executes a PromQL query at the given time.
|
||||
type QueryFunc func(context.Context, string, time.Time) (promql.Vector, error)
|
||||
|
||||
func query(ctx context.Context, q string, ts time.Time, queryFn QueryFunc) (queryResult, error) {
|
||||
vector, err := queryFn(ctx, q, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := query.Exec(ctx)
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
var vector promql.Vector
|
||||
|
||||
switch v := res.Value.(type) {
|
||||
case promql.Matrix:
|
||||
return nil, errors.New("matrix return values not supported")
|
||||
case promql.Vector:
|
||||
vector = v
|
||||
case promql.Scalar:
|
||||
vector = promql.Vector{promql.Sample{
|
||||
Point: promql.Point(v),
|
||||
}}
|
||||
case promql.String:
|
||||
vector = promql.Vector{promql.Sample{
|
||||
Metric: labels.FromStrings("__value__", v.V),
|
||||
Point: promql.Point{T: v.T},
|
||||
}}
|
||||
default:
|
||||
panic("template.query: unhandled result value type")
|
||||
}
|
||||
|
||||
// promql.Vector is hard to work with in templates, so convert to
|
||||
// base data types.
|
||||
|
@ -111,14 +90,22 @@ type Expander struct {
|
|||
}
|
||||
|
||||
// NewTemplateExpander returns a template expander ready to use.
|
||||
func NewTemplateExpander(ctx context.Context, text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, externalURL *url.URL) *Expander {
|
||||
func NewTemplateExpander(
|
||||
ctx context.Context,
|
||||
text string,
|
||||
name string,
|
||||
data interface{},
|
||||
timestamp model.Time,
|
||||
queryFunc QueryFunc,
|
||||
externalURL *url.URL,
|
||||
) *Expander {
|
||||
return &Expander{
|
||||
text: text,
|
||||
name: name,
|
||||
data: data,
|
||||
funcMap: text_template.FuncMap{
|
||||
"query": func(q string) (queryResult, error) {
|
||||
return query(ctx, q, timestamp.Time(), queryEngine)
|
||||
return query(ctx, q, timestamp.Time(), queryFunc)
|
||||
},
|
||||
"first": func(v queryResult) (*sample, error) {
|
||||
if len(v) > 0 {
|
||||
|
|
|
@ -18,21 +18,19 @@ import (
|
|||
"math"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
type testTemplatesScenario struct {
|
||||
text string
|
||||
output string
|
||||
input interface{}
|
||||
shouldFail bool
|
||||
html bool
|
||||
text string
|
||||
output string
|
||||
input interface{}
|
||||
queryResult promql.Vector
|
||||
shouldFail bool
|
||||
html bool
|
||||
}
|
||||
|
||||
func TestTemplateExpansion(t *testing.T) {
|
||||
|
@ -70,42 +68,72 @@ func TestTemplateExpansion(t *testing.T) {
|
|||
output: "1 2",
|
||||
},
|
||||
{
|
||||
text: "{{ query \"1.5\" | first | value }}",
|
||||
output: "1.5",
|
||||
},
|
||||
{
|
||||
// Get value from scalar query.
|
||||
text: "{{ query \"scalar(count(metric))\" | first | value }}",
|
||||
output: "2",
|
||||
text: "{{ query \"1.5\" | first | value }}",
|
||||
output: "1.5",
|
||||
queryResult: promql.Vector{{Point: promql.Point{T: 0, V: 1.5}}},
|
||||
},
|
||||
{
|
||||
// Get value from query.
|
||||
text: "{{ query \"metric{instance='a'}\" | first | value }}",
|
||||
text: "{{ query \"metric{instance='a'}\" | first | value }}",
|
||||
queryResult: promql.Vector{
|
||||
{
|
||||
Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"),
|
||||
Point: promql.Point{T: 0, V: 11},
|
||||
}},
|
||||
output: "11",
|
||||
},
|
||||
{
|
||||
// Get label from query.
|
||||
text: "{{ query \"metric{instance='a'}\" | first | label \"instance\" }}",
|
||||
text: "{{ query \"metric{instance='a'}\" | first | label \"instance\" }}",
|
||||
|
||||
queryResult: promql.Vector{
|
||||
{
|
||||
Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"),
|
||||
Point: promql.Point{T: 0, V: 11},
|
||||
}},
|
||||
output: "a",
|
||||
},
|
||||
{
|
||||
// Missing label is empty when using label function.
|
||||
text: "{{ query \"metric{instance='a'}\" | first | label \"foo\" }}",
|
||||
text: "{{ query \"metric{instance='a'}\" | first | label \"foo\" }}",
|
||||
queryResult: promql.Vector{
|
||||
{
|
||||
Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"),
|
||||
Point: promql.Point{T: 0, V: 11},
|
||||
}},
|
||||
output: "",
|
||||
},
|
||||
{
|
||||
// Missing label is empty when not using label function.
|
||||
text: "{{ $x := query \"metric\" | first }}{{ $x.Labels.foo }}",
|
||||
text: "{{ $x := query \"metric\" | first }}{{ $x.Labels.foo }}",
|
||||
queryResult: promql.Vector{
|
||||
{
|
||||
Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"),
|
||||
Point: promql.Point{T: 0, V: 11},
|
||||
}},
|
||||
output: "",
|
||||
},
|
||||
{
|
||||
text: "{{ $x := query \"metric\" | first }}{{ $x.Labels.foo }}",
|
||||
text: "{{ $x := query \"metric\" | first }}{{ $x.Labels.foo }}",
|
||||
queryResult: promql.Vector{
|
||||
{
|
||||
Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"),
|
||||
Point: promql.Point{T: 0, V: 11},
|
||||
}},
|
||||
output: "",
|
||||
html: true,
|
||||
},
|
||||
{
|
||||
// Range over query and sort by label.
|
||||
text: "{{ range query \"metric\" | sortByLabel \"instance\" }}{{.Labels.instance}}:{{.Value}}: {{end}}",
|
||||
text: "{{ range query \"metric\" | sortByLabel \"instance\" }}{{.Labels.instance}}:{{.Value}}: {{end}}",
|
||||
queryResult: promql.Vector{
|
||||
{
|
||||
Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "a"),
|
||||
Point: promql.Point{T: 0, V: 11},
|
||||
}, {
|
||||
Metric: labels.FromStrings(labels.MetricName, "metric", "instance", "b"),
|
||||
Point: promql.Point{T: 0, V: 21},
|
||||
}},
|
||||
output: "a:11: b:21: ",
|
||||
},
|
||||
{
|
||||
|
@ -115,13 +143,15 @@ func TestTemplateExpansion(t *testing.T) {
|
|||
},
|
||||
{
|
||||
// Error in function.
|
||||
text: "{{ query \"missing\" | first }}",
|
||||
shouldFail: true,
|
||||
text: "{{ query \"missing\" | first }}",
|
||||
queryResult: promql.Vector{},
|
||||
shouldFail: true,
|
||||
},
|
||||
{
|
||||
// Panic.
|
||||
text: "{{ (query \"missing\").banana }}",
|
||||
shouldFail: true,
|
||||
text: "{{ (query \"missing\").banana }}",
|
||||
queryResult: promql.Vector{},
|
||||
shouldFail: true,
|
||||
},
|
||||
{
|
||||
// Regex replacement.
|
||||
|
@ -211,36 +241,18 @@ func TestTemplateExpansion(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
time := model.Time(0)
|
||||
|
||||
storage := testutil.NewStorage(t)
|
||||
defer storage.Close()
|
||||
|
||||
app, err := storage.Appender()
|
||||
if err != nil {
|
||||
t.Fatalf("get appender: %s", err)
|
||||
}
|
||||
|
||||
_, err = app.Add(labels.FromStrings(labels.MetricName, "metric", "instance", "a"), 0, 11)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.FromStrings(labels.MetricName, "metric", "instance", "b"), 0, 21)
|
||||
require.NoError(t, err)
|
||||
|
||||
if err := app.Commit(); err != nil {
|
||||
t.Fatalf("commit samples: %s", err)
|
||||
}
|
||||
|
||||
engine := promql.NewEngine(storage, nil)
|
||||
|
||||
extURL, err := url.Parse("http://testhost:9090/path/prefix")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for i, s := range scenarios {
|
||||
queryFunc := func(_ context.Context, _ string, _ time.Time) (promql.Vector, error) {
|
||||
return s.queryResult, nil
|
||||
}
|
||||
var result string
|
||||
var err error
|
||||
expander := NewTemplateExpander(context.Background(), s.text, "test", s.input, time, engine, extURL)
|
||||
expander := NewTemplateExpander(context.Background(), s.text, "test", s.input, 0, queryFunc, extURL)
|
||||
if s.html {
|
||||
result, err = expander.ExpandHTML(nil)
|
||||
} else {
|
||||
|
|
20
web/web.go
20
web/web.go
|
@ -527,7 +527,15 @@ func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) {
|
|||
Path: strings.TrimLeft(name, "/"),
|
||||
}
|
||||
|
||||
tmpl := template.NewTemplateExpander(h.context, string(text), "__console_"+name, data, h.now(), h.queryEngine, h.options.ExternalURL)
|
||||
tmpl := template.NewTemplateExpander(
|
||||
h.context,
|
||||
string(text),
|
||||
"__console_"+name,
|
||||
data,
|
||||
h.now(),
|
||||
template.QueryFunc(rules.EngineQueryFunc(h.queryEngine)),
|
||||
h.options.ExternalURL,
|
||||
)
|
||||
filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib")
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -732,7 +740,15 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter
|
|||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
tmpl := template.NewTemplateExpander(h.context, text, name, data, h.now(), h.queryEngine, h.options.ExternalURL)
|
||||
tmpl := template.NewTemplateExpander(
|
||||
h.context,
|
||||
text,
|
||||
name,
|
||||
data,
|
||||
h.now(),
|
||||
template.QueryFunc(rules.EngineQueryFunc(h.queryEngine)),
|
||||
h.options.ExternalURL,
|
||||
)
|
||||
tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options))
|
||||
|
||||
result, err := tmpl.ExpandHTML(nil)
|
||||
|
|
Loading…
Reference in a new issue