diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index a8551eabc..bbe444dea 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -42,7 +42,7 @@ var cfg = struct { configFile string storage local.MemorySeriesStorageOptions - notification notification.NotificationHandlerOptions + notification notification.HandlerOptions queryEngine promql.EngineOptions web web.Options remote remote.Options @@ -202,11 +202,11 @@ func init() { "The URL of the alert manager to send notifications to.", ) cfg.fs.IntVar( - &cfg.notification.QueueCapacity, "alertmanager.notification-queue-capacity", 100, + &cfg.notification.QueueCapacity, "alertmanager.notification-queue-capacity", 10000, "The capacity of the queue for pending alert manager notifications.", ) cfg.fs.DurationVar( - &cfg.notification.Deadline, "alertmanager.http-deadline", 10*time.Second, + &cfg.notification.Timeout, "alertmanager.timeout", 10*time.Second, "Alert manager HTTP API timeout.", ) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 07031ec3f..deb5e1030 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -83,7 +83,7 @@ func Main() int { } var ( - notificationHandler = notification.NewNotificationHandler(&cfg.notification) + notificationHandler = notification.New(&cfg.notification) targetManager = retrieval.NewTargetManager(sampleAppender) queryEngine = promql.NewEngine(memStorage, &cfg.queryEngine) ) diff --git a/notification/notification.go b/notification/notification.go index 82d0e2b8a..b446c4fe7 100644 --- a/notification/notification.go +++ b/notification/notification.go @@ -16,24 +16,23 @@ package notification import ( "bytes" "encoding/json" - "io" - "io/ioutil" + "fmt" "net/http" - "strings" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" + "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/util/httputil" ) const ( - alertmanagerAPIEventsPath = "/api/alerts" - contentTypeJSON = "application/json" + alertPushEndpoint = "/api/v1/alerts" + contentTypeJSON = "application/json" ) // String constants for instrumentation. @@ -42,97 +41,75 @@ const ( subsystem = "notifications" ) -// NotificationReq is a request for sending a notification to the alert manager -// for a single alert vector element. -type NotificationReq struct { - // Short-form alert summary. May contain text/template-style interpolations. - Summary string - // Longer alert description. May contain text/template-style interpolations. - Description string - // A reference to the runbook for the alert. - Runbook string - // Labels associated with this alert notification, including alert name. - Labels model.LabelSet - // Current value of alert - Value model.SampleValue - // Since when this alert has been active (pending or firing). - ActiveSince time.Time - // A textual representation of the rule that triggered the alert. - RuleString string - // Prometheus console link to alert expression. - GeneratorURL string -} - -// NotificationReqs is just a short-hand for []*NotificationReq. No methods -// attached. Arguably, it's more confusing than helpful. Perhaps we should -// remove it... -type NotificationReqs []*NotificationReq - -type httpPoster interface { - Post(url string, bodyType string, body io.Reader) (*http.Response, error) -} - -// NotificationHandler is responsible for dispatching alert notifications to an +// Handler is responsible for dispatching alert notifications to an // alert manager service. -type NotificationHandler struct { - // The URL of the alert manager to send notifications to. - alertmanagerURL string - // Buffer of notifications that have not yet been sent. - pendingNotifications chan NotificationReqs - // HTTP client with custom timeout settings. - httpClient httpPoster +type Handler struct { + queue model.Alerts + opts *HandlerOptions - notificationLatency prometheus.Summary - notificationErrors prometheus.Counter - notificationDropped prometheus.Counter - notificationsQueueLength prometheus.Gauge - notificationsQueueCapacity prometheus.Metric + more chan struct{} + mtx sync.RWMutex + ctx context.Context + cancel func() - externalLabels model.LabelSet - mtx sync.RWMutex - stopped chan struct{} + latency prometheus.Summary + errors prometheus.Counter + dropped prometheus.Counter + sent prometheus.Counter + queueLength prometheus.Gauge + queueCapacity prometheus.Metric } -// NotificationHandlerOptions are the configurable parameters of a NotificationHandler. -type NotificationHandlerOptions struct { +// HandlerOptions are the configurable parameters of a Handler. +type HandlerOptions struct { AlertmanagerURL string QueueCapacity int - Deadline time.Duration + Timeout time.Duration + ExternalLabels model.LabelSet } -// NewNotificationHandler constructs a new NotificationHandler. -func NewNotificationHandler(o *NotificationHandlerOptions) *NotificationHandler { - return &NotificationHandler{ - alertmanagerURL: strings.TrimRight(o.AlertmanagerURL, "/"), - pendingNotifications: make(chan NotificationReqs, o.QueueCapacity), +// NewHandler constructs a new Handler. +func New(o *HandlerOptions) *Handler { + ctx, cancel := context.WithCancel(context.Background()) - httpClient: httputil.NewDeadlineClient(o.Deadline, nil), + return &Handler{ + queue: make(model.Alerts, 0, o.QueueCapacity), + ctx: ctx, + cancel: cancel, + more: make(chan struct{}, 1), + opts: o, - notificationLatency: prometheus.NewSummary(prometheus.SummaryOpts{ + latency: prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "latency_milliseconds", + Name: "latency_seconds", Help: "Latency quantiles for sending alert notifications (not including dropped notifications).", }), - notificationErrors: prometheus.NewCounter(prometheus.CounterOpts{ + errors: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "errors_total", Help: "Total number of errors sending alert notifications.", }), - notificationDropped: prometheus.NewCounter(prometheus.CounterOpts{ + sent: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_total", + Help: "Total number of alerts successfully sent.", + }), + dropped: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "dropped_total", - Help: "Total number of alert notifications dropped due to alert manager missing in configuration.", + Help: "Total number of alerts dropped due to alert manager missing in configuration.", }), - notificationsQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ + queueLength: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queue_length", Help: "The number of alert notifications in the queue.", }), - notificationsQueueCapacity: prometheus.MustNewConstMetric( + queueCapacity: prometheus.MustNewConstMetric( prometheus.NewDesc( prometheus.BuildFQName(namespace, subsystem, "queue_capacity"), "The capacity of the alert notifications queue.", @@ -141,114 +118,176 @@ func NewNotificationHandler(o *NotificationHandlerOptions) *NotificationHandler prometheus.GaugeValue, float64(o.QueueCapacity), ), - stopped: make(chan struct{}), } } // ApplyConfig updates the status state as the new config requires. // Returns true on success. -func (n *NotificationHandler) ApplyConfig(conf *config.Config) bool { +func (n *Handler) ApplyConfig(conf *config.Config) bool { n.mtx.Lock() defer n.mtx.Unlock() - n.externalLabels = conf.GlobalConfig.ExternalLabels + n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels return true } -// Send a list of notifications to the configured alert manager. -func (n *NotificationHandler) sendNotifications(reqs NotificationReqs) error { +const maxBatchSize = 64 + +func (n *Handler) queueLen() int { n.mtx.RLock() defer n.mtx.RUnlock() - alerts := make([]map[string]interface{}, 0, len(reqs)) - for _, req := range reqs { - for ln, lv := range n.externalLabels { - if _, ok := req.Labels[ln]; !ok { - req.Labels[ln] = lv + return len(n.queue) +} + +func (n *Handler) nextBatch() []*model.Alert { + n.mtx.Lock() + defer n.mtx.Unlock() + + var alerts model.Alerts + + if len(n.queue) > maxBatchSize { + alerts = append(make(model.Alerts, 0, maxBatchSize), n.queue[:maxBatchSize]...) + n.queue = n.queue[maxBatchSize:] + } else { + alerts = append(make(model.Alerts, 0, len(n.queue)), n.queue...) + n.queue = n.queue[:0] + } + + return alerts +} + +// Run dispatches notifications continuously. +func (n *Handler) Run() { + for { + select { + case <-n.ctx.Done(): + return + case <-n.more: + } + + alerts := n.nextBatch() + + if n.opts.AlertmanagerURL == "" { + log.Warn("No AlertManager configured, not dispatching %d alerts", len(alerts)) + n.dropped.Add(float64(len(alerts))) + continue + } + + begin := time.Now() + + if err := n.send(alerts...); err != nil { + log.Errorf("Error sending %d alerts: %s", len(alerts), err) + n.errors.Inc() + n.dropped.Add(float64(len(alerts))) + } + + n.latency.Observe(float64(time.Since(begin)) / float64(time.Second)) + n.sent.Add(float64(len(alerts))) + + // If the queue still has items left, kick off the next iteration. + if n.queueLen() > 0 { + n.setMore() + } + } +} + +// SubmitReqs queues the given notification requests for processing. +func (n *Handler) Send(alerts ...*model.Alert) { + n.mtx.Lock() + defer n.mtx.Unlock() + + // Queue capacity should be significantly larger than a single alert + // batch could be. + if d := len(alerts) - n.opts.QueueCapacity; d > 0 { + alerts = alerts[d:] + + log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d) + n.dropped.Add(float64(d)) + } + + // If the queue is full, remove the oldest alerts in favor + // of newer ones. + if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 { + n.queue = n.queue[d:] + + log.Warnf("Alert notification queue full, dropping %d alerts", d) + n.dropped.Add(float64(d)) + } + n.queue = append(n.queue, alerts...) + + // Notify sending goroutine that there are alerts to be processed. + n.setMore() +} + +// setMore signals that the alert queue has items. +func (n *Handler) setMore() { + // If we cannot send on the channel, it means the signal already exists + // and has not been consumed yet. + select { + case n.more <- struct{}{}: + default: + } +} + +func (n *Handler) postURL() string { + return n.opts.AlertmanagerURL + alertPushEndpoint +} + +func (n *Handler) send(alerts ...*model.Alert) error { + // Attach external labels before sending alerts. + for _, a := range alerts { + for ln, lv := range n.opts.ExternalLabels { + if _, ok := a.Labels[ln]; !ok { + a.Labels[ln] = lv } } - alerts = append(alerts, map[string]interface{}{ - "summary": req.Summary, - "description": req.Description, - "runbook": req.Runbook, - "labels": req.Labels, - "payload": map[string]interface{}{ - "value": req.Value, - "activeSince": req.ActiveSince, - "generatorURL": req.GeneratorURL, - "alertingRule": req.RuleString, - }, - }) } - buf, err := json.Marshal(alerts) - if err != nil { + + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(alerts); err != nil { return err } - log.Debugln("Sending notifications to alertmanager:", string(buf)) - resp, err := n.httpClient.Post( - n.alertmanagerURL+alertmanagerAPIEventsPath, - contentTypeJSON, - bytes.NewBuffer(buf), - ) + ctx, _ := context.WithTimeout(context.Background(), n.opts.Timeout) + + resp, err := ctxhttp.Post(ctx, http.DefaultClient, n.postURL(), contentTypeJSON, &buf) if err != nil { return err } defer resp.Body.Close() - _, err = ioutil.ReadAll(resp.Body) - if err != nil { - return err + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %v", resp.Status) } - // BUG: Do we need to check the response code? return nil } -// Run dispatches notifications continuously. -func (n *NotificationHandler) Run() { - for reqs := range n.pendingNotifications { - if n.alertmanagerURL == "" { - log.Warn("No alert manager configured, not dispatching notification") - n.notificationDropped.Inc() - continue - } - - begin := time.Now() - err := n.sendNotifications(reqs) - - if err != nil { - log.Error("Error sending notification: ", err) - n.notificationErrors.Inc() - } - - n.notificationLatency.Observe(float64(time.Since(begin) / time.Millisecond)) - } - close(n.stopped) -} - -// SubmitReqs queues the given notification requests for processing. -func (n *NotificationHandler) SubmitReqs(reqs NotificationReqs) { - n.pendingNotifications <- reqs -} - // Stop shuts down the notification handler. -func (n *NotificationHandler) Stop() { +func (n *Handler) Stop() { log.Info("Stopping notification handler...") - close(n.pendingNotifications) - <-n.stopped - log.Info("Notification handler stopped.") + + close(n.more) + n.cancel() } // Describe implements prometheus.Collector. -func (n *NotificationHandler) Describe(ch chan<- *prometheus.Desc) { - n.notificationLatency.Describe(ch) - ch <- n.notificationsQueueLength.Desc() - ch <- n.notificationsQueueCapacity.Desc() +func (n *Handler) Describe(ch chan<- *prometheus.Desc) { + ch <- n.latency.Desc() + ch <- n.errors.Desc() + ch <- n.sent.Desc() + ch <- n.dropped.Desc() + ch <- n.queueLength.Desc() + ch <- n.queueCapacity.Desc() } // Collect implements prometheus.Collector. -func (n *NotificationHandler) Collect(ch chan<- prometheus.Metric) { - n.notificationLatency.Collect(ch) - n.notificationsQueueLength.Set(float64(len(n.pendingNotifications))) - ch <- n.notificationsQueueLength - ch <- n.notificationsQueueCapacity +func (n *Handler) Collect(ch chan<- prometheus.Metric) { + n.queueLength.Set(float64(n.queueLen())) + + ch <- n.latency + ch <- n.errors + ch <- n.sent + ch <- n.dropped + ch <- n.queueLength + ch <- n.queueCapacity } diff --git a/notification/notification_test.go b/notification/notification_test.go index 5049f4a67..9e6ab6430 100644 --- a/notification/notification_test.go +++ b/notification/notification_test.go @@ -14,85 +14,212 @@ package notification import ( - "bytes" - "io" - "io/ioutil" + "encoding/json" + "fmt" "net/http" + "net/http/httptest" + "reflect" "testing" "time" "github.com/prometheus/common/model" ) -type testHTTPPoster struct { - message string - receivedPost chan<- bool +func TestHandlerNextBatch(t *testing.T) { + h := New(&HandlerOptions{}) + + for i := range make([]struct{}, 2*maxBatchSize+1) { + h.queue = append(h.queue, &model.Alert{ + Labels: model.LabelSet{ + "alertname": model.LabelValue(fmt.Sprintf("%d", i)), + }, + }) + } + + expected := append(model.Alerts{}, h.queue...) + + b := h.nextBatch() + + if len(b) != maxBatchSize { + t.Fatalf("Expected first batch of length %d, but got %d", maxBatchSize, len(b)) + } + if reflect.DeepEqual(expected[0:maxBatchSize], b) { + t.Fatalf("First batch did not match") + } + + b = h.nextBatch() + + if len(b) != maxBatchSize { + t.Fatalf("Expected second batch of length %d, but got %d", maxBatchSize, len(b)) + } + if reflect.DeepEqual(expected[maxBatchSize:2*maxBatchSize], b) { + t.Fatalf("Second batch did not match") + } + + b = h.nextBatch() + + if len(b) != 1 { + t.Fatalf("Expected third batch of length %d, but got %d", 1, len(b)) + } + if reflect.DeepEqual(expected[2*maxBatchSize:], b) { + t.Fatalf("Third batch did not match") + } + + if len(h.queue) != 0 { + t.Fatalf("Expected queue to be empty but got %d alerts", len(h.queue)) + } } -func (p *testHTTPPoster) Post(url string, bodyType string, body io.Reader) (*http.Response, error) { - var buf bytes.Buffer - buf.ReadFrom(body) - p.message = buf.String() - p.receivedPost <- true - return &http.Response{ - Body: ioutil.NopCloser(&bytes.Buffer{}), - }, nil +func alertsEqual(a, b model.Alerts) bool { + if len(a) != len(b) { + return false + } + for i, alert := range a { + if !alert.Labels.Equal(b[i].Labels) { + return false + } + } + return true } -type testNotificationScenario struct { - description string - summary string - message string - runbook string -} +func TestHandlerSend(t *testing.T) { + var ( + expected model.Alerts + status int + ) -func (s *testNotificationScenario) test(i int, t *testing.T) { - h := NewNotificationHandler(&NotificationHandlerOptions{ - AlertmanagerURL: "alertmanager_url", - QueueCapacity: 0, - Deadline: 10 * time.Second, + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != alertPushEndpoint { + t.Fatalf("Bad endpoint %q used, expected %q", r.URL.Path, alertPushEndpoint) + } + defer r.Body.Close() + + var alerts model.Alerts + if err := json.NewDecoder(r.Body).Decode(&alerts); err != nil { + t.Fatalf("Unexpected error on input decoding: %s", err) + } + + if !alertsEqual(alerts, expected) { + t.Errorf("%#v %#v", *alerts[0], *expected[0]) + t.Fatalf("Unexpected alerts received %v exp %v", alerts, expected) + } + + w.WriteHeader(status) + })) + + defer server.Close() + + h := New(&HandlerOptions{ + AlertmanagerURL: server.URL, + Timeout: time.Minute, + ExternalLabels: model.LabelSet{"a": "b"}, }) - defer h.Stop() - receivedPost := make(chan bool, 1) - poster := testHTTPPoster{receivedPost: receivedPost} - h.httpClient = &poster + for i := range make([]struct{}, maxBatchSize) { + h.queue = append(h.queue, &model.Alert{ + Labels: model.LabelSet{ + "alertname": model.LabelValue(fmt.Sprintf("%d", i)), + }, + }) + expected = append(expected, &model.Alert{ + Labels: model.LabelSet{ + "alertname": model.LabelValue(fmt.Sprintf("%d", i)), + "a": "b", + }, + }) + } + + status = http.StatusOK + + if err := h.send(h.queue...); err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + status = 500 + + if err := h.send(h.queue...); err == nil { + t.Fatalf("Expected error but got none") + } +} + +func TestHandlerFull(t *testing.T) { + var ( + unblock = make(chan struct{}) + called = make(chan struct{}) + expected model.Alerts + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called <- struct{}{} + <-unblock + + defer r.Body.Close() + + var alerts model.Alerts + if err := json.NewDecoder(r.Body).Decode(&alerts); err != nil { + t.Fatalf("Unexpected error on input decoding: %s", err) + } + + if !alertsEqual(expected, alerts) { + t.Errorf("Expected alerts %v, got %v", expected, alerts) + } + })) + + h := New(&HandlerOptions{ + AlertmanagerURL: server.URL, + Timeout: time.Second, + QueueCapacity: 3 * maxBatchSize, + }) + + var alerts model.Alerts + for i := range make([]struct{}, 20*maxBatchSize) { + alerts = append(alerts, &model.Alert{ + Labels: model.LabelSet{ + "alertname": model.LabelValue(fmt.Sprintf("%d", i)), + }, + }) + } go h.Run() + defer h.Stop() - h.SubmitReqs(NotificationReqs{ - { - Summary: s.summary, - Description: s.description, - Runbook: s.runbook, - Labels: model.LabelSet{ - model.LabelName("instance"): model.LabelValue("testinstance"), - }, - Value: model.SampleValue(1.0 / 3.0), - ActiveSince: time.Time{}, - RuleString: "Test rule string", - GeneratorURL: "prometheus_url", - }, - }) + h.Send(alerts[:4*maxBatchSize]...) - <-receivedPost - if poster.message != s.message { - t.Fatalf("%d. Expected '%s', received '%s'", i, s.message, poster.message) - } -} - -func TestNotificationHandler(t *testing.T) { - scenarios := []testNotificationScenario{ - { - // Correct message. - summary: "Summary", - description: "Description", - runbook: "Runbook", - message: `[{"description":"Description","labels":{"instance":"testinstance"},"payload":{"activeSince":"0001-01-01T00:00:00Z","alertingRule":"Test rule string","generatorURL":"prometheus_url","value":"0.3333333333333333"},"runbook":"Runbook","summary":"Summary"}]`, - }, - } - - for i, s := range scenarios { - s.test(i, t) + // If the batch is larger than the queue size, the front should be truncated + // from the front. Thus, we start at i=1. + for i := 1; i < 4; i++ { + select { + case <-called: + expected = alerts[i*maxBatchSize : (i+1)*maxBatchSize] + unblock <- struct{}{} + case <-time.After(time.Second): + t.Fatalf("Alerts were not pushed") + } + } + + // Send one batch, wait for it to arrive and block so the queue fills up. + // Then check whether the queue is truncated in the front once its full. + h.Send(alerts[:maxBatchSize]...) + <-called + + // Fill the 3*maxBatchSize queue. + h.Send(alerts[1*maxBatchSize : 2*maxBatchSize]...) + h.Send(alerts[2*maxBatchSize : 3*maxBatchSize]...) + h.Send(alerts[3*maxBatchSize : 4*maxBatchSize]...) + + // Send the batch that drops the first one. + h.Send(alerts[4*maxBatchSize : 5*maxBatchSize]...) + + expected = alerts[:maxBatchSize] + unblock <- struct{}{} + + for i := 2; i < 4; i++ { + select { + case <-called: + expected = alerts[i*maxBatchSize : (i+1)*maxBatchSize] + unblock <- struct{}{} + case <-time.After(time.Second): + t.Fatalf("Alerts were not pushed") + } } } diff --git a/rules/manager.go b/rules/manager.go index 7fcd3dea0..18dc62fba 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -100,7 +100,7 @@ type Manager struct { queryEngine *promql.Engine sampleAppender storage.SampleAppender - notificationHandler *notification.NotificationHandler + notificationHandler *notification.Handler externalURL *url.URL } @@ -110,7 +110,7 @@ type ManagerOptions struct { EvaluationInterval time.Duration QueryEngine *promql.Engine - NotificationHandler *notification.NotificationHandler + NotificationHandler *notification.Handler SampleAppender storage.SampleAppender ExternalURL *url.URL @@ -178,13 +178,14 @@ func (m *Manager) Stop() { m.done <- true } -func (m *Manager) queueAlertNotifications(rule *AlertingRule, timestamp model.Time) { +func (m *Manager) sendAlertNotifications(rule *AlertingRule, timestamp model.Time) { activeAlerts := rule.ActiveAlerts() if len(activeAlerts) == 0 { return } - notifications := make(notification.NotificationReqs, 0, len(activeAlerts)) + alerts := make(model.Alerts, 0, len(activeAlerts)) + for _, aa := range activeAlerts { if aa.State != StateFiring { // BUG: In the future, make AlertManager support pending alerts? @@ -217,20 +218,23 @@ func (m *Manager) queueAlertNotifications(rule *AlertingRule, timestamp model.Ti return result } - notifications = append(notifications, ¬ification.NotificationReq{ - Summary: expand(rule.summary), - Description: expand(rule.description), - Runbook: rule.runbook, - Labels: aa.Labels.Merge(model.LabelSet{ - alertNameLabel: model.LabelValue(rule.Name()), - }), - Value: aa.Value, - ActiveSince: aa.ActiveSince.Time(), - RuleString: rule.String(), + labels := aa.Labels.Clone() + labels[model.AlertNameLabel] = model.LabelValue(rule.Name()) + + annotations := model.LabelSet{ + "summary": model.LabelValue(expand(rule.summary)), + "description": model.LabelValue(expand(rule.description)), + "runbook": model.LabelValue(expand(rule.runbook)), + } + + alerts = append(alerts, &model.Alert{ + StartsAt: aa.ActiveSince.Time(), + Labels: labels, + Annotations: annotations, GeneratorURL: m.externalURL.String() + strutil.GraphLinkForExpression(rule.vector.String()), }) } - m.notificationHandler.SubmitReqs(notifications) + m.notificationHandler.Send(alerts...) } func (m *Manager) runIteration() { @@ -260,7 +264,7 @@ func (m *Manager) runIteration() { switch r := rule.(type) { case *AlertingRule: - m.queueAlertNotifications(r, now) + m.sendAlertNotifications(r, now) evalDuration.WithLabelValues(ruleTypeAlerting).Observe( float64(duration / time.Millisecond), ) diff --git a/vendor/golang.org/x/net/context/ctxhttp/cancelreq.go b/vendor/golang.org/x/net/context/ctxhttp/cancelreq.go new file mode 100644 index 000000000..48610e362 --- /dev/null +++ b/vendor/golang.org/x/net/context/ctxhttp/cancelreq.go @@ -0,0 +1,18 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build go1.5 + +package ctxhttp + +import "net/http" + +func canceler(client *http.Client, req *http.Request) func() { + ch := make(chan struct{}) + req.Cancel = ch + + return func() { + close(ch) + } +} diff --git a/vendor/golang.org/x/net/context/ctxhttp/cancelreq_go14.go b/vendor/golang.org/x/net/context/ctxhttp/cancelreq_go14.go new file mode 100644 index 000000000..56bcbadb8 --- /dev/null +++ b/vendor/golang.org/x/net/context/ctxhttp/cancelreq_go14.go @@ -0,0 +1,23 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !go1.5 + +package ctxhttp + +import "net/http" + +type requestCanceler interface { + CancelRequest(*http.Request) +} + +func canceler(client *http.Client, req *http.Request) func() { + rc, ok := client.Transport.(requestCanceler) + if !ok { + return func() {} + } + return func() { + rc.CancelRequest(req) + } +} diff --git a/vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go b/vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go new file mode 100644 index 000000000..504dd63ed --- /dev/null +++ b/vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go @@ -0,0 +1,79 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package ctxhttp provides helper functions for performing context-aware HTTP requests. +package ctxhttp // import "golang.org/x/net/context/ctxhttp" + +import ( + "io" + "net/http" + "net/url" + "strings" + + "golang.org/x/net/context" +) + +// Do sends an HTTP request with the provided http.Client and returns an HTTP response. +// If the client is nil, http.DefaultClient is used. +// If the context is canceled or times out, ctx.Err() will be returned. +func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { + if client == nil { + client = http.DefaultClient + } + + // Request cancelation changed in Go 1.5, see cancelreq.go and cancelreq_go14.go. + cancel := canceler(client, req) + + type responseAndError struct { + resp *http.Response + err error + } + result := make(chan responseAndError, 1) + + go func() { + resp, err := client.Do(req) + result <- responseAndError{resp, err} + }() + + select { + case <-ctx.Done(): + cancel() + return nil, ctx.Err() + case r := <-result: + return r.resp, r.err + } +} + +// Get issues a GET request via the Do function. +func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + return Do(ctx, client, req) +} + +// Head issues a HEAD request via the Do function. +func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) { + req, err := http.NewRequest("HEAD", url, nil) + if err != nil { + return nil, err + } + return Do(ctx, client, req) +} + +// Post issues a POST request via the Do function. +func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) { + req, err := http.NewRequest("POST", url, body) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", bodyType) + return Do(ctx, client, req) +} + +// PostForm issues a POST request via the Do function. +func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) { + return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode())) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index acf3dba67..29e913322 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -254,8 +254,13 @@ }, { "path": "golang.org/x/net/context", - "revision": "db8e4de5b2d6653f66aea53094624468caad15d2", - "revisionTime": "2015-08-24T18:07:02+02:00" + "revision": "2cba614e8ff920c60240d2677bc019af32ee04e5", + "revisionTime": "2015-10-21T11:20:37-06:00" + }, + { + "path": "golang.org/x/net/context/ctxhttp", + "revision": "2cba614e8ff920c60240d2677bc019af32ee04e5", + "revisionTime": "2015-10-21T11:20:37-06:00" }, { "path": "gopkg.in/fsnotify.v1",