Refactored Notifier to use Registerer

* Brought metrics back into Notifier

Notifier still implements a Collector. Check if that is needed.
This commit is contained in:
Goutham Veeramachaneni 2017-03-03 02:53:16 +05:30
parent 41da5c4ef2
commit f35816613e
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
3 changed files with 102 additions and 90 deletions

View file

@ -97,7 +97,7 @@ func Main() int {
reloadables = append(reloadables, remoteStorage) reloadables = append(reloadables, remoteStorage)
var ( var (
notifier = notifier.New(&cfg.notifier) notifier = notifier.New(&cfg.notifier, prometheus.DefaultRegisterer)
targetManager = retrieval.NewTargetManager(sampleAppender) targetManager = retrieval.NewTargetManager(sampleAppender)
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background()) ctx, cancelCtx = context.WithCancel(context.Background())

View file

@ -50,62 +50,19 @@ const (
alertmanagerLabel = "alertmanager" alertmanagerLabel = "alertmanager"
) )
var (
alertLatency = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "latency_seconds",
Help: "Latency quantiles for sending alert notifications (not including dropped notifications).",
},
[]string{alertmanagerLabel},
)
alertErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "errors_total",
Help: "Total number of errors sending alert notifications.",
},
[]string{alertmanagerLabel},
)
alertSent = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_total",
Help: "Total number of alerts sent.",
},
[]string{alertmanagerLabel},
)
alertDropped = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dropped_total",
Help: "Total number of alerts dropped due to errors when sending to Alertmanager.",
})
alertQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_length",
Help: "The number of alert notifications in the queue.",
})
)
// Notifier is responsible for dispatching alert notifications to an // Notifier is responsible for dispatching alert notifications to an
// alert manager service. // alert manager service.
type Notifier struct { type Notifier struct {
queue model.Alerts queue model.Alerts
opts *Options opts *Options
metrics *alertMetrics
more chan struct{} more chan struct{}
mtx sync.RWMutex mtx sync.RWMutex
ctx context.Context ctx context.Context
cancel func() cancel func()
queueCapacity prometheus.Metric
alertmanagers []*alertmanagerSet alertmanagers []*alertmanagerSet
cancelDiscovery func() cancelDiscovery func()
} }
@ -119,21 +76,53 @@ type Options struct {
Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error)
} }
// New constructs a new Notifier. type alertMetrics struct {
func New(o *Options) *Notifier { latency *prometheus.SummaryVec
ctx, cancel := context.WithCancel(context.Background()) errors *prometheus.CounterVec
sent *prometheus.CounterVec
if o.Do == nil { dropped prometheus.Counter
o.Do = ctxhttp.Do queueLength prometheus.Gauge
} queueCapacity prometheus.Metric
}
return &Notifier{
queue: make(model.Alerts, 0, o.QueueCapacity),
ctx: ctx,
cancel: cancel,
more: make(chan struct{}, 1),
opts: o,
func newAlertMetrics(r prometheus.Registerer, o *Options) *alertMetrics {
m := &alertMetrics{
latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "latency_seconds",
Help: "Latency quantiles for sending alert notifications (not including dropped notifications).",
},
[]string{alertmanagerLabel},
),
errors: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "errors_total",
Help: "Total number of errors sending alert notifications.",
},
[]string{alertmanagerLabel},
),
sent: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_total",
Help: "Total number of alerts successfully sent.",
},
[]string{alertmanagerLabel},
),
dropped: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dropped_total",
Help: "Total number of alerts dropped due to errors when sending to Alertmanager.",
}),
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_length",
Help: "The number of alert notifications in the queue.",
}),
queueCapacity: prometheus.MustNewConstMetric( queueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc( prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "queue_capacity"), prometheus.BuildFQName(namespace, subsystem, "queue_capacity"),
@ -144,6 +133,35 @@ func New(o *Options) *Notifier {
float64(o.QueueCapacity), float64(o.QueueCapacity),
), ),
} }
if r != nil {
r.MustRegister(
m.latency,
m.errors,
m.sent,
m.dropped,
)
}
return m
}
// New constructs a new Notifier.
func New(o *Options, r prometheus.Registerer) *Notifier {
ctx, cancel := context.WithCancel(context.Background())
if o.Do == nil {
o.Do = ctxhttp.Do
}
return &Notifier{
queue: make(model.Alerts, 0, o.QueueCapacity),
ctx: ctx,
cancel: cancel,
more: make(chan struct{}, 1),
opts: o,
metrics: newAlertMetrics(r, o),
}
} }
// ApplyConfig updates the status state as the new config requires. // ApplyConfig updates the status state as the new config requires.
@ -164,9 +182,10 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
} }
for _, am := range ams.ams { for _, am := range ams.ams {
alertErrors.WithLabelValues(am.url()) n.metrics.errors.WithLabelValues(am.url())
alertSent.WithLabelValues(am.url()) n.metrics.sent.WithLabelValues(am.url())
} }
ams.metrics = n.metrics
amSets = append(amSets, ams) amSets = append(amSets, ams)
} }
@ -224,7 +243,7 @@ func (n *Notifier) Run() {
alerts := n.nextBatch() alerts := n.nextBatch()
if !n.sendAll(alerts...) { if !n.sendAll(alerts...) {
alertDropped.Add(float64(len(alerts))) n.metrics.dropped.Add(float64(len(alerts)))
} }
// If the queue still has items left, kick off the next iteration. // If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 { if n.queueLen() > 0 {
@ -256,7 +275,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) {
alerts = alerts[d:] alerts = alerts[d:]
log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d) log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d)
alertDropped.Add(float64(d)) n.metrics.dropped.Add(float64(d))
} }
// If the queue is full, remove the oldest alerts in favor // If the queue is full, remove the oldest alerts in favor
@ -265,7 +284,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) {
n.queue = n.queue[d:] n.queue = n.queue[d:]
log.Warnf("Alert notification queue full, dropping %d alerts", d) log.Warnf("Alert notification queue full, dropping %d alerts", d)
alertDropped.Add(float64(d)) n.metrics.dropped.Add(float64(d))
} }
n.queue = append(n.queue, alerts...) n.queue = append(n.queue, alerts...)
@ -347,12 +366,12 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool {
if err := n.sendOne(ctx, ams.client, u, b); err != nil { if err := n.sendOne(ctx, ams.client, u, b); err != nil {
log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err) log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err)
alertErrors.WithLabelValues(u).Inc() n.metrics.errors.WithLabelValues(u).Inc()
} else { } else {
atomic.AddUint64(&numSuccess, 1) atomic.AddUint64(&numSuccess, 1)
} }
alertLatency.WithLabelValues(u).Observe(time.Since(begin).Seconds()) n.metrics.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds())
alertSent.WithLabelValues(u).Add(float64(len(alerts))) n.metrics.sent.WithLabelValues(u).Add(float64(len(alerts)))
wg.Done() wg.Done()
}(am) }(am)
@ -391,26 +410,16 @@ func (n *Notifier) Stop() {
// Describe implements prometheus.Collector. // Describe implements prometheus.Collector.
func (n *Notifier) Describe(ch chan<- *prometheus.Desc) { func (n *Notifier) Describe(ch chan<- *prometheus.Desc) {
alertLatency.Describe(ch) ch <- n.metrics.queueCapacity.Desc()
alertErrors.Describe(ch) ch <- n.metrics.queueLength.Desc()
alertSent.Describe(ch)
ch <- alertDropped.Desc()
ch <- alertQueueLength.Desc()
ch <- n.queueCapacity.Desc()
} }
// Collect implements prometheus.Collector. // Collect implements prometheus.Collector.
func (n *Notifier) Collect(ch chan<- prometheus.Metric) { func (n *Notifier) Collect(ch chan<- prometheus.Metric) {
alertQueueLength.Set(float64(n.queueLen())) n.metrics.queueLength.Set(float64(n.queueLen()))
alertLatency.Collect(ch) ch <- n.metrics.queueLength
alertErrors.Collect(ch) ch <- n.metrics.queueCapacity
alertSent.Collect(ch)
ch <- alertDropped
ch <- alertQueueLength
ch <- n.queueCapacity
} }
// alertmanager holds Alertmanager endpoint information. // alertmanager holds Alertmanager endpoint information.
@ -438,6 +447,8 @@ type alertmanagerSet struct {
cfg *config.AlertmanagerConfig cfg *config.AlertmanagerConfig
client *http.Client client *http.Client
metrics *alertMetrics
mtx sync.RWMutex mtx sync.RWMutex
ams []alertmanager ams []alertmanager
} }
@ -482,8 +493,8 @@ func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) {
continue continue
} }
alertSent.WithLabelValues(us) s.metrics.sent.WithLabelValues(us)
alertErrors.WithLabelValues(us) s.metrics.errors.WithLabelValues(us)
seen[us] = struct{}{} seen[us] = struct{}{}
s.ams = append(s.ams, am) s.ams = append(s.ams, am)

View file

@ -25,6 +25,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
@ -62,7 +63,7 @@ func TestPostPath(t *testing.T) {
} }
func TestHandlerNextBatch(t *testing.T) { func TestHandlerNextBatch(t *testing.T) {
h := New(&Options{}) h := New(&Options{}, prometheus.DefaultRegisterer)
for i := range make([]struct{}, 2*maxBatchSize+1) { for i := range make([]struct{}, 2*maxBatchSize+1) {
h.queue = append(h.queue, &model.Alert{ h.queue = append(h.queue, &model.Alert{
@ -149,7 +150,7 @@ func TestHandlerSendAll(t *testing.T) {
defer server1.Close() defer server1.Close()
defer server2.Close() defer server2.Close()
h := New(&Options{}) h := New(&Options{}, prometheus.DefaultRegisterer)
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{ h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
ams: []alertmanager{ ams: []alertmanager{
alertmanagerMock{ alertmanagerMock{
@ -216,7 +217,7 @@ func TestCustomDo(t *testing.T) {
Body: ioutil.NopCloser(nil), Body: ioutil.NopCloser(nil),
}, nil }, nil
}, },
}) }, prometheus.DefaultRegisterer)
h.sendOne(context.Background(), nil, testURL, []byte(testBody)) h.sendOne(context.Background(), nil, testURL, []byte(testBody))
@ -238,7 +239,7 @@ func TestExternalLabels(t *testing.T) {
Replacement: "c", Replacement: "c",
}, },
}, },
}) }, prometheus.DefaultRegisterer)
// This alert should get the external label attached. // This alert should get the external label attached.
h.Send(&model.Alert{ h.Send(&model.Alert{
@ -356,7 +357,7 @@ func TestHandlerQueueing(t *testing.T) {
cfg: &config.AlertmanagerConfig{ cfg: &config.AlertmanagerConfig{
Timeout: time.Second, Timeout: time.Second,
}, },
}) }, prometheus.DefaultRegisterer)
var alerts model.Alerts var alerts model.Alerts
for i := range make([]struct{}, 20*maxBatchSize) { for i := range make([]struct{}, 20*maxBatchSize) {