diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index c60f5150b9..b8ff348c74 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -236,7 +236,7 @@ func main() { ctxRule = context.Background() notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) + discoveryManager = discovery.NewManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) ruleManager = rules.NewManager(&rules.ManagerOptions{Appendable: fanoutStorage, diff --git a/discovery/manager.go b/discovery/manager.go index 070c5b9003..ea24dc9e77 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -36,15 +36,15 @@ import ( "github.com/prometheus/prometheus/discovery/zookeeper" ) -// DiscoveryProvider provides information about target groups. It maintains a set +// Discoverer provides information about target groups. It maintains a set // of sources from which TargetGroups can originate. Whenever a discovery provider -// detects a potential change, it sends the TargetGroup through its provided channel. +// detects a potential change, it sends the TargetGroup through its channel. // -// The DiscoveryProvider does not have to guarantee that an actual change happened. +// Discoverer does not know if an actual change happened. // It does guarantee that it sends the new TargetGroup whenever a change happens. // -// DiscoveryProviders should initially send a full set of all discoverable TargetGroups. -type DiscoveryProvider interface { +// Discoverers should initially send a full set of all discoverable TargetGroups. +type Discoverer interface { // Run hands a channel to the discovery provider(consul,dns etc) through which it can send // updated target groups. // Must returns if the context gets canceled. It should not close the update @@ -52,33 +52,35 @@ type DiscoveryProvider interface { Run(ctx context.Context, up chan<- []*config.TargetGroup) } -type targetSetProvider struct { - cancel func() - tgroups []*config.TargetGroup -} +// type pool struct { +// cancel func() +// tgroups []*config.TargetGroup +// } -// NewDiscoveryManager is the DiscoveryManager constructor -func NewDiscoveryManager(ctx context.Context, logger log.Logger) *DiscoveryManager { - return &DiscoveryManager{ - ctx: ctx, - logger: logger, - actionCh: make(chan func()), - syncCh: make(chan map[string][]*config.TargetGroup), - targetSetProviders: make(map[string]map[string]*targetSetProvider), +// NewManager is the Discovery Manager constructor +func NewManager(ctx context.Context, logger log.Logger) *Manager { + return &Manager{ + ctx: ctx, + logger: logger, + actionCh: make(chan func()), + syncCh: make(chan map[string][]*config.TargetGroup), + endpoints: make(map[string]map[string][]*config.TargetGroup), + discoverCancel: []context.CancelFunc{}, } } -// DiscoveryManager maintains a set of discovery providers and sends each update to a channel used by other packages. -type DiscoveryManager struct { - ctx context.Context - logger log.Logger - syncCh chan map[string][]*config.TargetGroup // map[targetSetName] - actionCh chan func() - targetSetProviders map[string]map[string]*targetSetProvider // map[targetSetName]map[providerName] +// Manager maintains a set of discovery providers and sends each update to a channel used by other packages. +type Manager struct { + ctx context.Context + logger log.Logger + syncCh chan map[string][]*config.TargetGroup // map[targetSetName] + actionCh chan func() + discoverCancel []context.CancelFunc + endpoints map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName] } // Run starts the background processing -func (m *DiscoveryManager) Run() error { +func (m *Manager) Run() error { for { select { case f := <-m.actionCh: @@ -90,27 +92,27 @@ func (m *DiscoveryManager) Run() error { } // SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates -func (m *DiscoveryManager) SyncCh() <-chan map[string][]*config.TargetGroup { +func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup { return m.syncCh } // ApplyConfig removes all running discovery providers and starts new ones using the provided config. -func (m *DiscoveryManager) ApplyConfig(cfg *config.Config) error { +func (m *Manager) ApplyConfig(cfg *config.Config) error { err := make(chan error) m.actionCh <- func() { - m.cancelDiscoveryProviders() + m.cancelDiscoverers() for _, scfg := range cfg.ScrapeConfigs { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { ctx, cancel := context.WithCancel(m.ctx) updates := make(chan []*config.TargetGroup) - m.createProvider(cancel, scfg.JobName, provName) + m.discoverCancel = append(m.discoverCancel, cancel) go prov.Run(ctx, updates) go func(provName string) { select { case <-ctx.Done(): - // First set of all targets the provider knows. + // First set of all endpoints the provider knows. case tgs, ok := <-updates: // Handle the case that a target provider exits and closes the channel // before the context is done. @@ -146,35 +148,29 @@ func (m *DiscoveryManager) ApplyConfig(cfg *config.Config) error { return <-err } -func (m *DiscoveryManager) cancelDiscoveryProviders() { - for targetSetName, targetSetProviders := range m.targetSetProviders { - for _, discoveryProvider := range targetSetProviders { - discoveryProvider.cancel() - } - delete(m.targetSetProviders, targetSetName) +func (m *Manager) cancelDiscoverers() { + for _, c := range m.discoverCancel { + c() } + m.endpoints = make(map[string]map[string][]*config.TargetGroup) + m.discoverCancel = []context.CancelFunc{} } -func (m *DiscoveryManager) createProvider(cancel context.CancelFunc, tsName, provName string) { - if m.targetSetProviders[tsName] == nil { - m.targetSetProviders[tsName] = make(map[string]*targetSetProvider) +// mergeGroups adds a new target group for a given discovery provider and returns all target groups for a given target set +func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { + if m.endpoints[tsName] == nil { + m.endpoints[tsName] = make(map[string][]*config.TargetGroup) } - m.targetSetProviders[tsName][provName] = &targetSetProvider{ - cancel: cancel, - tgroups: []*config.TargetGroup{}, - } -} + m.endpoints[tsName][provName] = []*config.TargetGroup{} -// mergeGroups adds a new target group for a named discovery provider and returns all target groups for a given target set -func (m *DiscoveryManager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { tset := make(chan map[string][]*config.TargetGroup) m.actionCh <- func() { if tg != nil { - m.targetSetProviders[tsName][provName].tgroups = tg + m.endpoints[tsName][provName] = tg } var tgAll []*config.TargetGroup - for _, prov := range m.targetSetProviders[tsName] { - for _, tg := range prov.tgroups { + for _, prov := range m.endpoints[tsName] { + for _, tg := range prov { tgAll = append(tgAll, tg) } } @@ -185,10 +181,10 @@ func (m *DiscoveryManager) mergeGroups(tsName, provName string, tg []*config.Tar return <-tset } -func (m *DiscoveryManager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]DiscoveryProvider { - providers := map[string]DiscoveryProvider{} +func (m *Manager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]Discoverer { + providers := map[string]Discoverer{} - app := func(mech string, i int, tp DiscoveryProvider) { + app := func(mech string, i int, tp Discoverer) { providers[fmt.Sprintf("%s/%d", mech, i)] = tp } @@ -280,7 +276,7 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { return &StaticProvider{groups} } -// Run implements the DiscoveryProvider interface. +// Run implements the Worker interface. func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // We still have to consider that the consumer exits right away in which case // the context will be canceled.