Introduce telemetry for rule evaluator durations.

This commit adds telemetry for the Prometheus expression rule
evaluator, which will enable meta-Prometheus monitoring of customers
to ensure that no instance is falling behind in answering routine
queries.

A few other sundry simplifications are introduced, too.
This commit is contained in:
Matt T. Proud 2013-05-23 21:29:27 +02:00
parent 8507c58bf2
commit c10780c966
3 changed files with 50 additions and 13 deletions

View file

@ -37,11 +37,6 @@ type groupedAggregation struct {
groupCount int groupCount int
} }
type labelValuePair struct {
label model.LabelName
value model.LabelValue
}
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Enums. // Enums.

View file

@ -47,17 +47,21 @@ func NewRuleManager(results chan *Result, interval time.Duration, storage *metri
interval: interval, interval: interval,
storage: storage, storage: storage,
} }
// BUG(julius): Extract this so that the caller manages concurrency.
go manager.run(results) go manager.run(results)
return manager return manager
} }
func (m *ruleManager) run(results chan *Result) { func (m *ruleManager) run(results chan *Result) {
ticker := time.Tick(m.interval) ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for { for {
select { select {
case <-ticker: case <-ticker.C:
start := time.Now()
m.runIteration(results) m.runIteration(results)
evalDurations.Add(map[string]string{intervalKey: m.interval.String()}, float64(time.Since(start)/time.Millisecond))
case <-m.done: case <-m.done:
log.Printf("RuleManager exiting...") log.Printf("RuleManager exiting...")
break break
@ -66,27 +70,31 @@ func (m *ruleManager) run(results chan *Result) {
} }
func (m *ruleManager) Stop() { func (m *ruleManager) Stop() {
m.done <- true select {
case m.done <- true:
default:
}
} }
func (m *ruleManager) runIteration(results chan *Result) { func (m *ruleManager) runIteration(results chan *Result) {
now := time.Now() now := time.Now()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, rule := range m.rules { for _, rule := range m.rules {
wg.Add(1) wg.Add(1)
// BUG(julius): Look at fixing thundering herd.
go func(rule Rule) { go func(rule Rule) {
defer wg.Done()
vector, err := rule.Eval(now, m.storage) vector, err := rule.Eval(now, m.storage)
samples := model.Samples{} samples := make(model.Samples, len(vector))
for _, sample := range vector { copy(samples, vector)
samples = append(samples, sample)
}
m.results <- &Result{ m.results <- &Result{
Samples: samples, Samples: samples,
Err: err, Err: err,
} }
wg.Done()
}(rule) }(rule)
} }
wg.Wait() wg.Wait()
} }

34
rules/telemetry.go Normal file
View file

@ -0,0 +1,34 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rules
import (
"github.com/prometheus/client_golang/prometheus"
)
const (
intervalKey = "interval"
)
var (
evalDurations = prometheus.NewHistogram(&prometheus.HistogramSpecification{
Starts: prometheus.LogarithmicSizedBucketsFor(0, 10000),
BucketBuilder: prometheus.AccumulatingBucketBuilder(prometheus.EvictAndReplaceWith(10, prometheus.AverageReducer), 100),
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}})
evalDuration = prometheus.NewCounter()
)
func init() {
prometheus.Register("prometheus_evaluator_duration_ms", "The duration for each evaluation pool to execute.", prometheus.NilLabels, evalDurations)
}