Track alerts over time and write out alert timeseries.

This commit is contained in:
Julius Volz 2013-04-24 11:51:40 +02:00
parent d46f2fd549
commit 2202cd71c9
11 changed files with 606 additions and 380 deletions

View file

@ -22,6 +22,12 @@ const (
InstanceLabel = LabelName("instance")
// The metric name for the synthetic health variable.
ScrapeHealthMetricName = LabelValue("up")
// The metric name for synthetic alert timeseries.
AlertMetricName = LabelValue("ALERTS")
// The label name indicating the name of an alert.
AlertNameLabel = LabelName("alertname")
// The label name indicating the state of an alert.
AlertStateLabel = LabelName("alertstate")
)
// A LabelName is a key for a LabelSet or Metric. It has a value associated

View file

@ -67,6 +67,14 @@ func (l LabelSet) String() string {
return buffer.String()
}
func (l LabelSet) ToMetric() (metric Metric) {
metric = Metric{}
for label, value := range l {
metric[label] = value
}
return
}
// A Metric is similar to a LabelSet, but the key difference is that a Metric is
// a singleton and refers to one and only one stream of samples.
type Metric map[LabelName]LabelValue

143
rules/alerting.go Normal file
View file

@ -0,0 +1,143 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rules
import (
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/utility"
"time"
)
// States that active alerts can be in.
type alertState int
func (s alertState) String() (state string) {
switch s {
case PENDING:
state = "pending"
case FIRING:
state = "firing"
}
return
}
const (
PENDING alertState = iota
FIRING
)
// alert is used to track active (pending/firing) alerts over time.
type alert struct {
// The name of the alert.
name string
// The vector element labelset triggering this alert.
metric model.Metric
// The state of the alert (PENDING or FIRING).
state alertState
// The time when the alert first transitioned into PENDING state.
activeSince time.Time
}
// sample returns a Sample suitable for recording the alert.
func (a alert) sample(timestamp time.Time, value model.SampleValue) model.Sample {
recordedMetric := model.Metric{}
for label, value := range a.metric {
recordedMetric[label] = value
}
recordedMetric[model.MetricNameLabel] = model.AlertMetricName
recordedMetric[model.AlertNameLabel] = model.LabelValue(a.name)
recordedMetric[model.AlertStateLabel] = model.LabelValue(a.state.String())
return model.Sample{
Metric: recordedMetric,
Value: value,
Timestamp: timestamp,
}
}
// An alerting rule generates alerts from its vector expression.
type AlertingRule struct {
// The name of the alert.
name string
// The vector expression from which to generate alerts.
vector ast.VectorNode
// The duration for which a labelset needs to persist in the expression
// output vector before an alert transitions from PENDING to FIRING state.
holdDuration time.Duration
// Extra labels to attach to the resulting alert sample vectors.
labels model.LabelSet
// A map of alerts which are currently active (PENDING or FIRING), keyed by
// the fingerprint of the labelset they correspond to.
activeAlerts map[model.Fingerprint]*alert
}
func (rule AlertingRule) Name() string { return rule.name }
func (rule AlertingRule) EvalRaw(timestamp time.Time) (vector ast.Vector, err error) {
return ast.EvalVectorInstant(rule.vector, timestamp)
}
func (rule AlertingRule) Eval(timestamp time.Time) (vector ast.Vector, err error) {
// Get the raw value of the rule expression.
exprResult, err := rule.EvalRaw(timestamp)
if err != nil {
return
}
// Create pending alerts for any new vector elements in the alert expression.
resultFingerprints := utility.Set{}
for _, sample := range exprResult {
fp := model.NewFingerprintFromMetric(sample.Metric)
resultFingerprints.Add(fp)
if _, ok := rule.activeAlerts[fp]; !ok {
rule.activeAlerts[fp] = &alert{
name: rule.name,
metric: sample.Metric,
state: PENDING,
activeSince: timestamp,
}
}
}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, activeAlert := range rule.activeAlerts {
if !resultFingerprints.Has(fp) {
vector = append(vector, activeAlert.sample(timestamp, 0))
delete(rule.activeAlerts, fp)
continue
}
if activeAlert.state == PENDING && timestamp.Sub(activeAlert.activeSince) >= rule.holdDuration {
vector = append(vector, activeAlert.sample(timestamp, 0))
activeAlert.state = FIRING
}
vector = append(vector, activeAlert.sample(timestamp, 1))
}
return
}
// Construct a new AlertingRule.
func NewAlertingRule(name string, vector ast.VectorNode, holdDuration time.Duration, labels model.LabelSet) *AlertingRule {
return &AlertingRule{
name: name,
vector: vector,
holdDuration: holdDuration,
labels: labels,
activeAlerts: map[model.Fingerprint]*alert{},
}
}

View file

@ -72,7 +72,7 @@ func (m *ruleManager) runIteration(results chan *Result) {
for _, rule := range m.rules {
wg.Add(1)
go func(rule Rule) {
vector, err := rule.Eval(&now)
vector, err := rule.Eval(now)
m.results <- &Result{
Samples: vector,
Err: err,

76
rules/recording.go Normal file
View file

@ -0,0 +1,76 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rules
import (
"fmt"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/rules/ast"
"time"
)
// A RecordingRule records its vector expression into new timeseries.
type RecordingRule struct {
name string
vector ast.VectorNode
labels model.LabelSet
permanent bool
}
func (rule RecordingRule) Name() string { return rule.name }
func (rule RecordingRule) EvalRaw(timestamp time.Time) (vector ast.Vector, err error) {
return ast.EvalVectorInstant(rule.vector, timestamp)
}
func (rule RecordingRule) Eval(timestamp time.Time) (vector ast.Vector, err error) {
// Get the raw value of the rule expression.
vector, err = rule.EvalRaw(timestamp)
if err != nil {
return
}
// Override the metric name and labels.
for _, sample := range vector {
sample.Metric[model.MetricNameLabel] = model.LabelValue(rule.name)
for label, value := range rule.labels {
if value == "" {
delete(sample.Metric, label)
} else {
sample.Metric[label] = value
}
}
}
return
}
// RuleToDotGraph returns a Graphviz dot graph of the recording rule.
func (rule RecordingRule) RuleToDotGraph() string {
graph := "digraph \"Rules\" {\n"
graph += fmt.Sprintf("%#p[shape=\"box\",label=\"%v = \"];\n", rule, rule.name)
graph += fmt.Sprintf("%#p -> %#p;\n", &rule, rule.vector)
graph += rule.vector.NodeTreeToDotGraph()
graph += "}\n"
return graph
}
// Construct a new RecordingRule.
func NewRecordingRule(name string, labels model.LabelSet, vector ast.VectorNode, permanent bool) *RecordingRule {
return &RecordingRule{
name: name,
labels: labels,
vector: vector,
permanent: permanent,
}
}

View file

@ -14,8 +14,6 @@
package rules
import (
"fmt"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/rules/ast"
"time"
)
@ -27,94 +25,7 @@ type Rule interface {
Name() string
// EvalRaw evaluates the rule's vector expression without triggering any
// other actions, like recording or alerting.
EvalRaw(timestamp *time.Time) (vector ast.Vector, err error)
EvalRaw(timestamp time.Time) (vector ast.Vector, err error)
// Eval evaluates the rule, including any associated recording or alerting actions.
Eval(timestamp *time.Time) (vector ast.Vector, err error)
}
// A RecordingRule records its vector expression into new timeseries.
type RecordingRule struct {
name string
vector ast.VectorNode
labels model.LabelSet
permanent bool
}
// An alerting rule generates alerts from its vector expression.
type AlertingRule struct {
name string
vector ast.VectorNode
holdDuration time.Duration
labels model.LabelSet
}
func (rule RecordingRule) Name() string { return rule.name }
func (rule RecordingRule) EvalRaw(timestamp *time.Time) (vector ast.Vector, err error) {
return ast.EvalVectorInstant(rule.vector, *timestamp)
}
func (rule RecordingRule) Eval(timestamp *time.Time) (vector ast.Vector, err error) {
// Get the raw value of the rule expression.
vector, err = rule.EvalRaw(timestamp)
if err != nil {
return
}
// Override the metric name and labels.
for _, sample := range vector {
sample.Metric[model.MetricNameLabel] = model.LabelValue(rule.name)
for label, value := range rule.labels {
if value == "" {
delete(sample.Metric, label)
} else {
sample.Metric[label] = value
}
}
}
return
}
func (rule RecordingRule) RuleToDotGraph() string {
graph := "digraph \"Rules\" {\n"
graph += fmt.Sprintf("%#p[shape=\"box\",label=\"%v = \"];\n", rule, rule.name)
graph += fmt.Sprintf("%#p -> %#p;\n", &rule, rule.vector)
graph += rule.vector.NodeTreeToDotGraph()
graph += "}\n"
return graph
}
func (rule AlertingRule) Name() string { return rule.name }
func (rule AlertingRule) EvalRaw(timestamp *time.Time) (vector ast.Vector, err error) {
return ast.EvalVectorInstant(rule.vector, *timestamp)
}
func (rule AlertingRule) Eval(timestamp *time.Time) (vector ast.Vector, err error) {
// Get the raw value of the rule expression.
vector, err = rule.EvalRaw(timestamp)
if err != nil {
return
}
// TODO(julius): handle alerting.
return
}
func NewRecordingRule(name string, labels model.LabelSet, vector ast.VectorNode, permanent bool) *RecordingRule {
return &RecordingRule{
name: name,
labels: labels,
vector: vector,
permanent: permanent,
}
}
func NewAlertingRule(name string, vector ast.VectorNode, holdDuration time.Duration, labels model.LabelSet) *AlertingRule {
return &AlertingRule{
name: name,
vector: vector,
holdDuration: holdDuration,
labels: labels,
}
Eval(timestamp time.Time) (vector ast.Vector, err error)
}

View file

@ -15,6 +15,7 @@ package rules
import (
"fmt"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
@ -25,19 +26,46 @@ import (
)
var (
testEvalTime = testStartTime.Add(testDuration5m * 10)
testEvalTime = testStartTime.Add(testSampleInterval * 10)
fixturesPath = "fixtures"
)
// Labels in expected output need to be alphabetically sorted.
var expressionTests = []struct {
func annotateWithTime(lines []string, timestamp time.Time) []string {
annotatedLines := []string{}
for _, line := range lines {
annotatedLines = append(annotatedLines, fmt.Sprintf(line, timestamp))
}
return annotatedLines
}
func vectorComparisonString(expected []string, actual []string) string {
separator := "\n--------------\n"
return fmt.Sprintf("Expected:%v%v%v\nActual:%v%v%v ",
separator,
strings.Join(expected, "\n"),
separator,
separator,
strings.Join(actual, "\n"),
separator)
}
func newTestStorage(t test.Tester) (storage metric.Storage, closer test.Closer) {
storage, closer = metric.NewTestTieredStorage(t)
ast.SetStorage(storage)
storeMatrix(storage, testMatrix)
return
}
func ExpressionTests(t *testing.T) {
// Labels in expected output need to be alphabetically sorted.
var expressionTests = []struct {
expr string
output []string
shouldFail bool
checkOrder bool
fullRanges int
intervalRanges int
}{
}{
{
expr: "SUM(http_requests)",
output: []string{"http_requests{} => 3600 @[%v]"},
@ -255,43 +283,14 @@ var expressionTests = []struct {
expr: "http_requests['1m']",
shouldFail: true,
},
}
func annotateWithTime(lines []string) []string {
annotatedLines := []string{}
for _, line := range lines {
annotatedLines = append(annotatedLines, fmt.Sprintf(line, testEvalTime))
}
return annotatedLines
}
func vectorComparisonString(expected []string, actual []string) string {
separator := "\n--------------\n"
return fmt.Sprintf("Expected:%v%v%v\nActual:%v%v%v ",
separator,
strings.Join(expected, "\n"),
separator,
separator,
strings.Join(actual, "\n"),
separator)
}
func TestExpressions(t *testing.T) {
temporaryDirectory := test.NewTemporaryDirectory("rule_expression_tests", t)
defer temporaryDirectory.Close()
tieredStorage, err := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, temporaryDirectory.Path())
if err != nil {
t.Fatalf("Error opening storage: %s", err)
}
go tieredStorage.Serve()
ast.SetStorage(tieredStorage)
storeMatrix(tieredStorage, testMatrix)
tieredStorage, closer := newTestStorage(t)
defer closer.Close()
tieredStorage.Flush()
for i, exprTest := range expressionTests {
expectedLines := annotateWithTime(exprTest.output)
expectedLines := annotateWithTime(exprTest.output, testEvalTime)
testExpr, err := LoadExprFromString(exprTest.expr)
@ -299,25 +298,25 @@ func TestExpressions(t *testing.T) {
if exprTest.shouldFail {
continue
}
t.Errorf("%d Error during parsing: %v", i, err)
t.Errorf("%d Expression: %v", i, exprTest.expr)
t.Errorf("%d. Error during parsing: %v", i, err)
t.Errorf("%d. Expression: %v", i, exprTest.expr)
} else {
if exprTest.shouldFail {
t.Errorf("%d Test should fail, but didn't", i)
t.Errorf("%d. Test should fail, but didn't", i)
}
failed := false
resultStr := ast.EvalToString(testExpr, testEvalTime, ast.TEXT)
resultLines := strings.Split(resultStr, "\n")
if len(exprTest.output) != len(resultLines) {
t.Errorf("%d Number of samples in expected and actual output don't match", i)
t.Errorf("%d. Number of samples in expected and actual output don't match", i)
failed = true
}
if exprTest.checkOrder {
for j, expectedSample := range expectedLines {
if resultLines[j] != expectedSample {
t.Errorf("%d.%d Expected sample '%v', got '%v'", i, j, resultLines[j], expectedSample)
t.Errorf("%d.%d. Expected sample '%v', got '%v'", i, j, resultLines[j], expectedSample)
failed = true
}
}
@ -330,7 +329,7 @@ func TestExpressions(t *testing.T) {
}
}
if !found {
t.Errorf("%d.%d Couldn't find expected sample in output: '%v'", i, j, expectedSample)
t.Errorf("%d.%d. Couldn't find expected sample in output: '%v'", i, j, expectedSample)
failed = true
}
}
@ -339,16 +338,16 @@ func TestExpressions(t *testing.T) {
analyzer := ast.NewQueryAnalyzer()
analyzer.AnalyzeQueries(testExpr)
if exprTest.fullRanges != len(analyzer.FullRanges) {
t.Errorf("%d Count of full ranges didn't match: %v vs %v", i, exprTest.fullRanges, len(analyzer.FullRanges))
t.Errorf("%d. Count of full ranges didn't match: %v vs %v", i, exprTest.fullRanges, len(analyzer.FullRanges))
failed = true
}
if exprTest.intervalRanges != len(analyzer.IntervalRanges) {
t.Errorf("%d Count of interval ranges didn't match: %v vs %v", i, exprTest.intervalRanges, len(analyzer.IntervalRanges))
t.Errorf("%d. Count of interval ranges didn't match: %v vs %v", i, exprTest.intervalRanges, len(analyzer.IntervalRanges))
failed = true
}
if failed {
t.Errorf("%d Expression: %v\n%v", i, exprTest.expr, vectorComparisonString(expectedLines, resultLines))
t.Errorf("%d. Expression: %v\n%v", i, exprTest.expr, vectorComparisonString(expectedLines, resultLines))
}
}
}
@ -420,3 +419,79 @@ func TestRules(t *testing.T) {
}
}
}
func TestAlertingRule(t *testing.T) {
// Labels in expected output need to be alphabetically sorted.
var evalOutputs = [][]string{
{
"ALERTS{alertname='HttpRequestRateLow',alertstate='pending',group='canary',instance='0',job='app-server'} => 1 @[%v]",
"ALERTS{alertname='HttpRequestRateLow',alertstate='pending',group='canary',instance='1',job='app-server'} => 1 @[%v]",
},
{
"ALERTS{alertname='HttpRequestRateLow',alertstate='pending',group='canary',instance='0',job='app-server'} => 0 @[%v]",
"ALERTS{alertname='HttpRequestRateLow',alertstate='firing',group='canary',instance='0',job='app-server'} => 1 @[%v]",
"ALERTS{alertname='HttpRequestRateLow',alertstate='pending',group='canary',instance='1',job='app-server'} => 0 @[%v]",
"ALERTS{alertname='HttpRequestRateLow',alertstate='firing',group='canary',instance='1',job='app-server'} => 1 @[%v]",
},
{
"ALERTS{alertname='HttpRequestRateLow',alertstate='firing',group='canary',instance='1',job='app-server'} => 0 @[%v]",
"ALERTS{alertname='HttpRequestRateLow',alertstate='firing',group='canary',instance='0',job='app-server'} => 0 @[%v]",
},
{
/* empty */
},
{
/* empty */
},
}
tieredStorage, closer := newTestStorage(t)
defer closer.Close()
tieredStorage.Flush()
alertExpr, err := LoadExprFromString("http_requests{group='canary',job='app-server'} < 100")
if err != nil {
t.Fatalf("Unable to parse alert expression: %s", err)
}
alertName := "HttpRequestRateLow"
alertLabels := model.LabelSet{
"summary": "HTTP request rate is low",
}
rule := NewAlertingRule(alertName, alertExpr.(ast.VectorNode), time.Minute, alertLabels)
for i, expected := range evalOutputs {
evalTime := testStartTime.Add(testSampleInterval * time.Duration(i))
actual, err := rule.Eval(evalTime)
if err != nil {
t.Fatalf("Error during alerting rule evaluation: %s", err)
}
actualLines := strings.Split(actual.String(), "\n")
expectedLines := annotateWithTime(expected, evalTime)
if actualLines[0] == "" {
actualLines = []string{}
}
failed := false
if len(actualLines) != len(expectedLines) {
t.Errorf("%d. Number of samples in expected and actual output don't match (%d vs. %d)", i, len(expectedLines), len(actualLines))
failed = true
}
for j, expectedSample := range expectedLines {
found := false
for _, actualSample := range actualLines {
if actualSample == expectedSample {
found = true
}
}
if !found {
t.Errorf("%d.%d. Couldn't find expected sample in output: '%v'", i, j, expectedSample)
failed = true
}
}
if failed {
t.Fatalf("%d. Expected and actual outputs don't match:\n%v", i, vectorComparisonString(expectedLines, actualLines))
}
}
}

View file

@ -20,7 +20,7 @@ import (
"time"
)
var testDuration5m = time.Duration(5) * time.Minute
var testSampleInterval = time.Duration(5) * time.Minute
var testStartTime = time.Time{}
func getTestValueStream(startVal model.SampleValue,
@ -33,7 +33,7 @@ func getTestValueStream(startVal model.SampleValue,
Timestamp: currentTime,
}
resultValues = append(resultValues, sample)
currentTime = currentTime.Add(testDuration5m)
currentTime = currentTime.Add(testSampleInterval)
}
return resultValues
}

View file

@ -75,7 +75,9 @@ func (l *LevelDBMetricPersistence) Close() {
for _, closer := range persistences {
closerGroup.Add(1)
go func(closer leveldbCloser) {
if closer != nil {
closer.Close()
}
closerGroup.Done()
}(closer)
}

View file

@ -72,3 +72,39 @@ func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func
f(p, t)
}
}
type testTieredStorageCloser struct {
storage Storage
directory test.Closer
}
func (t testTieredStorageCloser) Close() {
t.storage.Close()
t.directory.Close()
}
func NewTestTieredStorage(t test.Tester) (storage Storage, closer test.Closer) {
var directory test.TemporaryDirectory
directory = test.NewTemporaryDirectory("test_tiered_storage", t)
storage, err := NewTieredStorage(5000000, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path())
if err != nil {
if storage != nil {
storage.Close()
}
directory.Close()
t.Fatalf("Error creating storage: %s", err)
}
if storage == nil {
directory.Close()
t.Fatalf("storage == nil")
}
go storage.Serve()
closer = &testTieredStorageCloser{
storage: storage,
directory: directory,
}
return
}

View file

@ -21,37 +21,6 @@ import (
"time"
)
type testTieredStorageCloser struct {
storage Storage
directory test.Closer
}
func (t testTieredStorageCloser) Close() {
t.storage.Close()
t.directory.Close()
}
func newTestTieredStorage(t test.Tester) (storage Storage, closer test.Closer) {
var directory test.TemporaryDirectory
directory = test.NewTemporaryDirectory("test_tiered_storage", t)
storage, err := NewTieredStorage(5000000, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path())
if err != nil {
t.Fatalf("Error creating storage: %s", err)
}
if storage == nil {
t.Fatalf("storage == nil")
}
go storage.Serve()
closer = &testTieredStorageCloser{
storage: storage,
directory: directory,
}
return
}
func buildSamples(from, to time.Time, interval time.Duration, m model.Metric) (v []model.Sample) {
i := model.SampleValue(0)
@ -369,7 +338,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
)
for i, scenario := range scenarios {
tiered, closer := newTestTieredStorage(t)
tiered, closer := NewTestTieredStorage(t)
for j, datum := range scenario.data {
err := tiered.AppendSample(datum)
@ -507,7 +476,7 @@ func TestGetAllValuesForLabel(t *testing.T) {
}
for i, scenario := range scenarios {
tiered, closer := newTestTieredStorage(t)
tiered, closer := NewTestTieredStorage(t)
for j, metric := range scenario.in {
sample := model.Sample{
Metric: model.Metric{model.MetricNameLabel: model.LabelValue(metric.metricName)},
@ -542,7 +511,7 @@ func TestGetAllValuesForLabel(t *testing.T) {
}
func TestGetFingerprintsForLabelSet(t *testing.T) {
tiered, closer := newTestTieredStorage(t)
tiered, closer := NewTestTieredStorage(t)
defer closer.Close()
memorySample := model.Sample{
Metric: model.Metric{model.MetricNameLabel: "http_requests", "method": "/foo"},