diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 2939a74033..c090313b97 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -238,7 +238,7 @@ func main() { ctxWeb, cancelWeb = context.WithCancel(context.Background()) ctxRule = context.Background() - notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) + notifier = notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier")) ctxScrape, cancelScrape = context.WithCancel(context.Background()) discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape")) @@ -246,7 +246,7 @@ func main() { ctxNotify, cancelNotify = context.WithCancel(context.Background()) discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify")) - scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) + scrapeManager = retrieval.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage) queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, @@ -654,7 +654,7 @@ func computeExternalURL(u, listenAddr string) (*url.URL, error) { // sendAlerts implements a the rules.NotifyFunc for a Notifier. // It filters any non-firing alerts from the input. -func sendAlerts(n *notifier.Notifier, externalURL string) rules.NotifyFunc { +func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc { return func(ctx context.Context, expr string, alerts ...*rules.Alert) error { var res []*notifier.Alert diff --git a/notifier/notifier.go b/notifier/notifier.go index 9a857a2876..94e4c5e7ea 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -101,9 +101,9 @@ func (a *Alert) ResolvedAt(ts time.Time) bool { return !a.EndsAt.After(ts) } -// Notifier is responsible for dispatching alert notifications to an +// Manager is responsible for dispatching alert notifications to an // alert manager service. -type Notifier struct { +type Manager struct { queue []*Alert opts *Options @@ -206,8 +206,8 @@ func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanag return m } -// New constructs a new Notifier. -func New(o *Options, logger log.Logger) *Notifier { +// NewManager is the manager constructor. +func NewManager(o *Options, logger log.Logger) *Manager { ctx, cancel := context.WithCancel(context.Background()) if o.Do == nil { @@ -217,7 +217,7 @@ func New(o *Options, logger log.Logger) *Notifier { logger = log.NewNopLogger() } - n := &Notifier{ + n := &Manager{ queue: make([]*Alert, 0, o.QueueCapacity), ctx: ctx, cancel: cancel, @@ -240,7 +240,7 @@ func New(o *Options, logger log.Logger) *Notifier { } // ApplyConfig updates the status state as the new config requires. -func (n *Notifier) ApplyConfig(conf *config.Config) error { +func (n *Manager) ApplyConfig(conf *config.Config) error { n.mtx.Lock() defer n.mtx.Unlock() @@ -272,14 +272,14 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { const maxBatchSize = 64 -func (n *Notifier) queueLen() int { +func (n *Manager) queueLen() int { n.mtx.RLock() defer n.mtx.RUnlock() return len(n.queue) } -func (n *Notifier) nextBatch() []*Alert { +func (n *Manager) nextBatch() []*Alert { n.mtx.Lock() defer n.mtx.Unlock() @@ -297,7 +297,7 @@ func (n *Notifier) nextBatch() []*Alert { } // Run dispatches notifications continuously. -func (n *Notifier) Run(tsets <-chan map[string][]*targetgroup.Group) { +func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { for { select { @@ -319,7 +319,7 @@ func (n *Notifier) Run(tsets <-chan map[string][]*targetgroup.Group) { } } -func (n *Notifier) reload(tgs map[string][]*targetgroup.Group) { +func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { n.mtx.Lock() defer n.mtx.Unlock() @@ -335,7 +335,7 @@ func (n *Notifier) reload(tgs map[string][]*targetgroup.Group) { // Send queues the given notification requests for processing. // Panics if called on a handler that is not running. -func (n *Notifier) Send(alerts ...*Alert) { +func (n *Manager) Send(alerts ...*Alert) { n.mtx.Lock() defer n.mtx.Unlock() @@ -377,7 +377,7 @@ func (n *Notifier) Send(alerts ...*Alert) { n.setMore() } -func (n *Notifier) relabelAlerts(alerts []*Alert) []*Alert { +func (n *Manager) relabelAlerts(alerts []*Alert) []*Alert { var relabeledAlerts []*Alert for _, alert := range alerts { @@ -391,7 +391,7 @@ func (n *Notifier) relabelAlerts(alerts []*Alert) []*Alert { } // setMore signals that the alert queue has items. -func (n *Notifier) setMore() { +func (n *Manager) setMore() { // If we cannot send on the channel, it means the signal already exists // and has not been consumed yet. select { @@ -401,7 +401,7 @@ func (n *Notifier) setMore() { } // Alertmanagers returns a slice of Alertmanager URLs. -func (n *Notifier) Alertmanagers() []*url.URL { +func (n *Manager) Alertmanagers() []*url.URL { n.mtx.RLock() amSets := n.alertmanagers n.mtx.RUnlock() @@ -421,7 +421,7 @@ func (n *Notifier) Alertmanagers() []*url.URL { // sendAll sends the alerts to all configured Alertmanagers concurrently. // It returns true if the alerts could be sent successfully to at least one Alertmanager. -func (n *Notifier) sendAll(alerts ...*Alert) bool { +func (n *Manager) sendAll(alerts ...*Alert) bool { begin := time.Now() b, err := json.Marshal(alerts) @@ -469,7 +469,7 @@ func (n *Notifier) sendAll(alerts ...*Alert) bool { return numSuccess > 0 } -func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error { +func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error { req, err := http.NewRequest("POST", url, bytes.NewReader(b)) if err != nil { return err @@ -489,7 +489,7 @@ func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b [] } // Stop shuts down the notification handler. -func (n *Notifier) Stop() { +func (n *Manager) Stop() { level.Info(n.logger).Log("msg", "Stopping notification manager...") n.cancel() } diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index d994cfccbd..80bffaf225 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -71,7 +71,7 @@ func TestPostPath(t *testing.T) { } func TestHandlerNextBatch(t *testing.T) { - h := New(&Options{}, nil) + h := NewManager(&Options{}, nil) for i := range make([]struct{}, 2*maxBatchSize+1) { h.queue = append(h.queue, &Alert{ @@ -168,7 +168,7 @@ func TestHandlerSendAll(t *testing.T) { defer server1.Close() defer server2.Close() - h := New(&Options{}, nil) + h := NewManager(&Options{}, nil) authClient, _ := httputil.NewClientFromConfig(config_util.HTTPClientConfig{ BasicAuth: &config_util.BasicAuth{ @@ -233,7 +233,7 @@ func TestCustomDo(t *testing.T) { const testBody = "testbody" var received bool - h := New(&Options{ + h := NewManager(&Options{ Do: func(ctx old_ctx.Context, client *http.Client, req *http.Request) (*http.Response, error) { received = true body, err := ioutil.ReadAll(req.Body) @@ -260,7 +260,7 @@ func TestCustomDo(t *testing.T) { } func TestExternalLabels(t *testing.T) { - h := New(&Options{ + h := NewManager(&Options{ QueueCapacity: 3 * maxBatchSize, ExternalLabels: model.LabelSet{"a": "b"}, RelabelConfigs: []*config.RelabelConfig{ @@ -296,7 +296,7 @@ func TestExternalLabels(t *testing.T) { } func TestHandlerRelabel(t *testing.T) { - h := New(&Options{ + h := NewManager(&Options{ QueueCapacity: 3 * maxBatchSize, RelabelConfigs: []*config.RelabelConfig{ { @@ -356,7 +356,7 @@ func TestHandlerQueueing(t *testing.T) { } })) - h := New(&Options{ + h := NewManager(&Options{ QueueCapacity: 3 * maxBatchSize, }, nil, @@ -469,7 +469,7 @@ func TestReload(t *testing.T) { }, } - n := New(&Options{}, nil) + n := NewManager(&Options{}, nil) cfg := &config.Config{} s := ` diff --git a/retrieval/manager.go b/retrieval/manager.go index b6e3d780c5..3fb88cb7bc 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -31,8 +31,8 @@ type Appendable interface { Appender() (storage.Appender, error) } -// NewScrapeManager is the ScrapeManager constructor -func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager { +// NewManager is the ScrapeManager constructor +func NewManager(logger log.Logger, app Appendable) *ScrapeManager { return &ScrapeManager{ append: app, diff --git a/retrieval/manager_test.go b/retrieval/manager_test.go index 3e54cf31a7..a4f2df96ea 100644 --- a/retrieval/manager_test.go +++ b/retrieval/manager_test.go @@ -246,7 +246,7 @@ func TestManagerReloadNoChange(t *testing.T) { }, } - scrapeManager := NewScrapeManager(nil, nil) + scrapeManager := NewManager(nil, nil) scrapeManager.scrapeConfigs[tsetName] = reloadCfg.ScrapeConfigs[0] // As reload never happens, new loop should never be called. newLoop := func(_ *Target, s scraper) loop { diff --git a/web/web.go b/web/web.go index 74752f72ef..5271db67e5 100644 --- a/web/web.go +++ b/web/web.go @@ -77,7 +77,7 @@ type Handler struct { context context.Context tsdb func() *tsdb.DB storage storage.Storage - notifier *notifier.Notifier + notifier *notifier.Manager apiV1 *api_v1.API @@ -127,7 +127,7 @@ type Options struct { QueryEngine *promql.Engine ScrapeManager *retrieval.ScrapeManager RuleManager *rules.Manager - Notifier *notifier.Notifier + Notifier *notifier.Manager Version *PrometheusVersion Flags map[string]string