diff --git a/discovery/manager.go b/discovery/manager.go index 77c9598d3..fa8cb183e 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -16,7 +16,7 @@ package discovery import ( "context" "fmt" - "time" + "sort" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -64,7 +64,7 @@ func NewManager(ctx context.Context, logger log.Logger) *Manager { logger: logger, actionCh: make(chan func()), syncCh: make(chan map[string][]*config.TargetGroup), - endpoints: make(map[string]map[string][]*config.TargetGroup), + targets: make(map[string]map[string][]*config.TargetGroup), discoverCancel: []context.CancelFunc{}, } } @@ -76,7 +76,7 @@ type Manager struct { syncCh chan map[string][]*config.TargetGroup // map[targetSetName] actionCh chan func() discoverCancel []context.CancelFunc - endpoints map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName] + targets map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName] } // Run starts the background processing @@ -86,12 +86,13 @@ func (m *Manager) Run() error { case f := <-m.actionCh: f() case <-m.ctx.Done(): + m.cancelDiscoverers() return m.ctx.Err() } } } -// SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates +// SyncCh returns a read only channel used by all Discoverers to send target updates. func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup { return m.syncCh } @@ -120,22 +121,6 @@ func (m *Manager) startProvider(jobName, provName string, worker Discoverer) { go worker.Run(ctx, updates) go func(provName string) { - select { - case <-ctx.Done(): - // First set of all endpoints the provider knows. - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - break - } - m.syncCh <- m.mergeGroups(jobName, provName, tgs) - case <-time.After(5 * time.Second): - // Initial set didn't arrive. Act as if it was empty - // and wait for updates later on. - } - - // Start listening for further updates. for { select { case <-ctx.Done(): @@ -156,26 +141,37 @@ func (m *Manager) cancelDiscoverers() { for _, c := range m.discoverCancel { c() } - m.endpoints = make(map[string]map[string][]*config.TargetGroup) + m.targets = make(map[string]map[string][]*config.TargetGroup) m.discoverCancel = []context.CancelFunc{} } // mergeGroups adds a new target group for a given discovery provider and returns all target groups for a given target set func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { - if m.endpoints[tsName] == nil { - m.endpoints[tsName] = make(map[string][]*config.TargetGroup) - } - m.endpoints[tsName][provName] = []*config.TargetGroup{} - tset := make(chan map[string][]*config.TargetGroup) + m.actionCh <- func() { - if tg != nil { - m.endpoints[tsName][provName] = tg + if m.targets[tsName] == nil { + m.targets[tsName] = make(map[string][]*config.TargetGroup) } - var tgAll []*config.TargetGroup - for _, prov := range m.endpoints[tsName] { - for _, tg := range prov { - tgAll = append(tgAll, tg) + m.targets[tsName][provName] = []*config.TargetGroup{} + + if tg != nil { + m.targets[tsName][provName] = tg + } + tgAll := []*config.TargetGroup{} + + // Sort the providers alphabetically. + // Maps cannot be sorted so need to extract the keys to a slice and sort the string slice. + var providerNames []string + for providerName := range m.targets[tsName] { + providerNames = append(providerNames, providerName) + } + sort.Strings(providerNames) + for _, prov := range providerNames { + for _, tg := range m.targets[tsName][prov] { + if tg.Source != "" { // Don't add empty targets. + tgAll = append(tgAll, tg) + } } } t := make(map[string][]*config.TargetGroup)