mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-13 17:14:05 -08:00
c187308366
This is based on https://github.com/prometheus/prometheus/pull/1997. This adds contexts to the relevant Storage methods and already passes PromQL's new per-query context into the storage's query methods. The immediate motivation supporting multi-tenancy in Frankenstein, but this could also be used by Prometheus's normal local storage to support cancellations and timeouts at some point.
506 lines
13 KiB
Go
506 lines
13 KiB
Go
// Copyright 2013 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rules
|
|
|
|
import (
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/url"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
html_template "html/template"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/log"
|
|
"github.com/prometheus/common/model"
|
|
"golang.org/x/net/context"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/notifier"
|
|
"github.com/prometheus/prometheus/promql"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/storage/local"
|
|
"github.com/prometheus/prometheus/util/strutil"
|
|
)
|
|
|
|
// Constants for instrumentation.
|
|
const namespace = "prometheus"
|
|
|
|
var (
|
|
evalDuration = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Namespace: namespace,
|
|
Name: "rule_evaluation_duration_seconds",
|
|
Help: "The duration for a rule to execute.",
|
|
},
|
|
[]string{"rule_type"},
|
|
)
|
|
evalFailures = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Name: "rule_evaluation_failures_total",
|
|
Help: "The total number of rule evaluation failures.",
|
|
},
|
|
[]string{"rule_type"},
|
|
)
|
|
evalTotal = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Name: "rule_evaluations_total",
|
|
Help: "The total number of rule evaluations.",
|
|
},
|
|
[]string{"rule_type"},
|
|
)
|
|
iterationDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
|
Namespace: namespace,
|
|
Name: "evaluator_duration_seconds",
|
|
Help: "The duration of rule group evaluations.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
})
|
|
iterationsSkipped = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Name: "evaluator_iterations_skipped_total",
|
|
Help: "The total number of rule group evaluations skipped due to throttled metric storage.",
|
|
})
|
|
iterationsScheduled = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Name: "evaluator_iterations_total",
|
|
Help: "The total number of scheduled rule group evaluations, whether skipped or executed.",
|
|
})
|
|
)
|
|
|
|
func init() {
|
|
evalTotal.WithLabelValues(string(ruleTypeAlert))
|
|
evalTotal.WithLabelValues(string(ruleTypeRecording))
|
|
evalFailures.WithLabelValues(string(ruleTypeAlert))
|
|
evalFailures.WithLabelValues(string(ruleTypeRecording))
|
|
|
|
prometheus.MustRegister(iterationDuration)
|
|
prometheus.MustRegister(iterationsSkipped)
|
|
prometheus.MustRegister(evalFailures)
|
|
prometheus.MustRegister(evalDuration)
|
|
}
|
|
|
|
type ruleType string
|
|
|
|
const (
|
|
ruleTypeAlert = "alerting"
|
|
ruleTypeRecording = "recording"
|
|
)
|
|
|
|
// A Rule encapsulates a vector expression which is evaluated at a specified
|
|
// interval and acted upon (currently either recorded or used for alerting).
|
|
type Rule interface {
|
|
Name() string
|
|
// eval evaluates the rule, including any associated recording or alerting actions.
|
|
eval(context.Context, model.Time, *promql.Engine, string) (model.Vector, error)
|
|
// String returns a human-readable string representation of the rule.
|
|
String() string
|
|
// HTMLSnippet returns a human-readable string representation of the rule,
|
|
// decorated with HTML elements for use the web frontend.
|
|
HTMLSnippet(pathPrefix string) html_template.HTML
|
|
}
|
|
|
|
// Group is a set of rules that have a logical relation.
|
|
type Group struct {
|
|
name string
|
|
interval time.Duration
|
|
rules []Rule
|
|
opts *ManagerOptions
|
|
|
|
done chan struct{}
|
|
terminated chan struct{}
|
|
}
|
|
|
|
func newGroup(name string, opts *ManagerOptions) *Group {
|
|
return &Group{
|
|
name: name,
|
|
opts: opts,
|
|
done: make(chan struct{}),
|
|
terminated: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (g *Group) run() {
|
|
defer close(g.terminated)
|
|
|
|
// Wait an initial amount to have consistently slotted intervals.
|
|
select {
|
|
case <-time.After(g.offset()):
|
|
case <-g.done:
|
|
return
|
|
}
|
|
|
|
iter := func() {
|
|
iterationsScheduled.Inc()
|
|
if g.opts.SampleAppender.NeedsThrottling() {
|
|
iterationsSkipped.Inc()
|
|
return
|
|
}
|
|
start := time.Now()
|
|
g.eval()
|
|
|
|
iterationDuration.Observe(time.Since(start).Seconds())
|
|
}
|
|
iter()
|
|
|
|
tick := time.NewTicker(g.interval)
|
|
defer tick.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-g.done:
|
|
return
|
|
default:
|
|
select {
|
|
case <-g.done:
|
|
return
|
|
case <-tick.C:
|
|
iter()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (g *Group) stop() {
|
|
close(g.done)
|
|
<-g.terminated
|
|
}
|
|
|
|
func (g *Group) fingerprint() model.Fingerprint {
|
|
l := model.LabelSet{"name": model.LabelValue(g.name)}
|
|
return l.Fingerprint()
|
|
}
|
|
|
|
// offset returns until the next consistently slotted evaluation interval.
|
|
func (g *Group) offset() time.Duration {
|
|
now := time.Now().UnixNano()
|
|
|
|
var (
|
|
base = now - (now % int64(g.interval))
|
|
offset = uint64(g.fingerprint()) % uint64(g.interval)
|
|
next = base + int64(offset)
|
|
)
|
|
|
|
if next < now {
|
|
next += int64(g.interval)
|
|
}
|
|
return time.Duration(next - now)
|
|
}
|
|
|
|
// copyState copies the alerting rule state from the given group.
|
|
func (g *Group) copyState(from *Group) {
|
|
for _, fromRule := range from.rules {
|
|
far, ok := fromRule.(*AlertingRule)
|
|
if !ok {
|
|
continue
|
|
}
|
|
for _, rule := range g.rules {
|
|
ar, ok := rule.(*AlertingRule)
|
|
if !ok {
|
|
continue
|
|
}
|
|
// TODO(fabxc): forbid same alert definitions that are not unique by
|
|
// at least on static label or alertname?
|
|
if far.equal(ar) {
|
|
for fp, a := range far.active {
|
|
ar.active[fp] = a
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func typeForRule(r Rule) ruleType {
|
|
switch r.(type) {
|
|
case *AlertingRule:
|
|
return ruleTypeAlert
|
|
case *RecordingRule:
|
|
return ruleTypeRecording
|
|
}
|
|
panic(fmt.Errorf("unknown rule type: %T", r))
|
|
}
|
|
|
|
// eval runs a single evaluation cycle in which all rules are evaluated in parallel.
|
|
// In the future a single group will be evaluated sequentially to properly handle
|
|
// rule dependency.
|
|
func (g *Group) eval() {
|
|
var (
|
|
now = model.Now()
|
|
wg sync.WaitGroup
|
|
)
|
|
|
|
for _, rule := range g.rules {
|
|
rtyp := string(typeForRule(rule))
|
|
|
|
wg.Add(1)
|
|
// BUG(julius): Look at fixing thundering herd.
|
|
go func(rule Rule) {
|
|
defer wg.Done()
|
|
|
|
defer func(t time.Time) {
|
|
evalDuration.WithLabelValues(rtyp).Observe(time.Since(t).Seconds())
|
|
}(time.Now())
|
|
|
|
evalTotal.WithLabelValues(rtyp).Inc()
|
|
|
|
vector, err := rule.eval(g.opts.Context, now, g.opts.QueryEngine, g.opts.ExternalURL.Path)
|
|
if err != nil {
|
|
// Canceled queries are intentional termination of queries. This normally
|
|
// happens on shutdown and thus we skip logging of any errors here.
|
|
if _, ok := err.(promql.ErrQueryCanceled); !ok {
|
|
log.Warnf("Error while evaluating rule %q: %s", rule, err)
|
|
}
|
|
evalFailures.WithLabelValues(rtyp).Inc()
|
|
return
|
|
}
|
|
|
|
if ar, ok := rule.(*AlertingRule); ok {
|
|
g.sendAlerts(ar, now)
|
|
}
|
|
var (
|
|
numOutOfOrder = 0
|
|
numDuplicates = 0
|
|
)
|
|
for _, s := range vector {
|
|
if err := g.opts.SampleAppender.Append(s); err != nil {
|
|
switch err {
|
|
case local.ErrOutOfOrderSample:
|
|
numOutOfOrder++
|
|
log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded")
|
|
case local.ErrDuplicateSampleForTimestamp:
|
|
numDuplicates++
|
|
log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded")
|
|
default:
|
|
log.With("sample", s).With("error", err).Warn("Rule evaluation result discarded")
|
|
}
|
|
}
|
|
}
|
|
if numOutOfOrder > 0 {
|
|
log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order result from rule evaluation")
|
|
}
|
|
if numDuplicates > 0 {
|
|
log.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp")
|
|
}
|
|
}(rule)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// sendAlerts sends alert notifications for the given rule.
|
|
func (g *Group) sendAlerts(rule *AlertingRule, timestamp model.Time) error {
|
|
var alerts model.Alerts
|
|
|
|
for _, alert := range rule.currentAlerts() {
|
|
// Only send actually firing alerts.
|
|
if alert.State == StatePending {
|
|
continue
|
|
}
|
|
|
|
a := &model.Alert{
|
|
StartsAt: alert.ActiveAt.Add(rule.holdDuration).Time(),
|
|
Labels: alert.Labels,
|
|
Annotations: alert.Annotations,
|
|
GeneratorURL: g.opts.ExternalURL.String() + strutil.GraphLinkForExpression(rule.vector.String()),
|
|
}
|
|
if alert.ResolvedAt != 0 {
|
|
a.EndsAt = alert.ResolvedAt.Time()
|
|
}
|
|
|
|
alerts = append(alerts, a)
|
|
}
|
|
|
|
if len(alerts) > 0 {
|
|
g.opts.Notifier.Send(alerts...)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// The Manager manages recording and alerting rules.
|
|
type Manager struct {
|
|
opts *ManagerOptions
|
|
groups map[string]*Group
|
|
mtx sync.RWMutex
|
|
block chan struct{}
|
|
}
|
|
|
|
// ManagerOptions bundles options for the Manager.
|
|
type ManagerOptions struct {
|
|
ExternalURL *url.URL
|
|
QueryEngine *promql.Engine
|
|
Context context.Context
|
|
Notifier *notifier.Notifier
|
|
SampleAppender storage.SampleAppender
|
|
}
|
|
|
|
// NewManager returns an implementation of Manager, ready to be started
|
|
// by calling the Run method.
|
|
func NewManager(o *ManagerOptions) *Manager {
|
|
manager := &Manager{
|
|
groups: map[string]*Group{},
|
|
opts: o,
|
|
block: make(chan struct{}),
|
|
}
|
|
return manager
|
|
}
|
|
|
|
// Run starts processing of the rule manager.
|
|
func (m *Manager) Run() {
|
|
close(m.block)
|
|
}
|
|
|
|
// Stop the rule manager's rule evaluation cycles.
|
|
func (m *Manager) Stop() {
|
|
log.Info("Stopping rule manager...")
|
|
|
|
for _, eg := range m.groups {
|
|
eg.stop()
|
|
}
|
|
|
|
log.Info("Rule manager stopped.")
|
|
}
|
|
|
|
// ApplyConfig updates the rule manager's state as the config requires. If
|
|
// loading the new rules failed the old rule set is restored.
|
|
func (m *Manager) ApplyConfig(conf *config.Config) error {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
// Get all rule files and load the groups they define.
|
|
var files []string
|
|
for _, pat := range conf.RuleFiles {
|
|
fs, err := filepath.Glob(pat)
|
|
if err != nil {
|
|
// The only error can be a bad pattern.
|
|
return fmt.Errorf("error retrieving rule files for %s: %s", pat, err)
|
|
}
|
|
files = append(files, fs...)
|
|
}
|
|
|
|
groups, err := m.loadGroups(files...)
|
|
if err != nil {
|
|
return fmt.Errorf("error loading rules, previous rule set restored: %s", err)
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
for _, newg := range groups {
|
|
// To be replaced with a configurable per-group interval.
|
|
newg.interval = time.Duration(conf.GlobalConfig.EvaluationInterval)
|
|
|
|
wg.Add(1)
|
|
|
|
// If there is an old group with the same identifier, stop it and wait for
|
|
// it to finish the current iteration. Then copy its into the new group.
|
|
oldg, ok := m.groups[newg.name]
|
|
delete(m.groups, newg.name)
|
|
|
|
go func(newg *Group) {
|
|
if ok {
|
|
oldg.stop()
|
|
newg.copyState(oldg)
|
|
}
|
|
go func() {
|
|
// Wait with starting evaluation until the rule manager
|
|
// is told to run. This is necessary to avoid running
|
|
// queries against a bootstrapping storage.
|
|
<-m.block
|
|
newg.run()
|
|
}()
|
|
wg.Done()
|
|
}(newg)
|
|
}
|
|
|
|
// Stop remaining old groups.
|
|
for _, oldg := range m.groups {
|
|
oldg.stop()
|
|
}
|
|
|
|
wg.Wait()
|
|
m.groups = groups
|
|
|
|
return nil
|
|
}
|
|
|
|
// loadGroups reads groups from a list of files.
|
|
// As there's currently no group syntax a single group named "default" containing
|
|
// all rules will be returned.
|
|
func (m *Manager) loadGroups(filenames ...string) (map[string]*Group, error) {
|
|
groups := map[string]*Group{}
|
|
|
|
// Currently there is no group syntax implemented. Thus all rules
|
|
// are read into a single default group.
|
|
g := newGroup("default", m.opts)
|
|
groups[g.name] = g
|
|
|
|
for _, fn := range filenames {
|
|
content, err := ioutil.ReadFile(fn)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
stmts, err := promql.ParseStmts(string(content))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing %s: %s", fn, err)
|
|
}
|
|
|
|
for _, stmt := range stmts {
|
|
var rule Rule
|
|
|
|
switch r := stmt.(type) {
|
|
case *promql.AlertStmt:
|
|
rule = NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Annotations)
|
|
|
|
case *promql.RecordStmt:
|
|
rule = NewRecordingRule(r.Name, r.Expr, r.Labels)
|
|
|
|
default:
|
|
panic("retrieval.Manager.LoadRuleFiles: unknown statement type")
|
|
}
|
|
g.rules = append(g.rules, rule)
|
|
}
|
|
}
|
|
|
|
return groups, nil
|
|
}
|
|
|
|
// Rules returns the list of the manager's rules.
|
|
func (m *Manager) Rules() []Rule {
|
|
m.mtx.RLock()
|
|
defer m.mtx.RUnlock()
|
|
|
|
var rules []Rule
|
|
for _, g := range m.groups {
|
|
rules = append(rules, g.rules...)
|
|
}
|
|
|
|
return rules
|
|
}
|
|
|
|
// AlertingRules returns the list of the manager's alerting rules.
|
|
func (m *Manager) AlertingRules() []*AlertingRule {
|
|
m.mtx.RLock()
|
|
defer m.mtx.RUnlock()
|
|
|
|
alerts := []*AlertingRule{}
|
|
for _, rule := range m.Rules() {
|
|
if alertingRule, ok := rule.(*AlertingRule); ok {
|
|
alerts = append(alerts, alertingRule)
|
|
}
|
|
}
|
|
return alerts
|
|
}
|