diff --git a/discovery/manager.go b/discovery/manager.go index 3cb68d060..dbfc0bda4 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -52,10 +52,18 @@ type Discoverer interface { Run(ctx context.Context, up chan<- []*config.TargetGroup) } -// type pool struct { -// cancel func() -// tgroups []*config.TargetGroup -// } +type poolKey struct { + set string + provider string +} + +// byProvider implements sort.Interface for []poolKey based on the provider field. +// Sorting is needed so that we can have predictable tests. +type byProvider []poolKey + +func (a byProvider) Len() int { return len(a) } +func (a byProvider) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byProvider) Less(i, j int) bool { return a[i].provider < a[j].provider } // NewManager is the Discovery Manager constructor func NewManager(logger log.Logger) *Manager { @@ -63,20 +71,19 @@ func NewManager(logger log.Logger) *Manager { logger: logger, actionCh: make(chan func(context.Context)), syncCh: make(chan map[string][]*config.TargetGroup), - targets: make(map[string]map[string][]*config.TargetGroup), + targets: make(map[poolKey][]*config.TargetGroup), discoverCancel: []context.CancelFunc{}, } } // Manager maintains a set of discovery providers and sends each update to a channel used by other packages. -// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. -// Targets pool is kept in a map with a format map[targetSetName]map[providerName]. type Manager struct { logger log.Logger - syncCh chan map[string][]*config.TargetGroup actionCh chan func(context.Context) discoverCancel []context.CancelFunc - targets map[string]map[string][]*config.TargetGroup + targets map[poolKey][]*config.TargetGroup + // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. + syncCh chan map[string][]*config.TargetGroup } // Run starts the background processing @@ -104,7 +111,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { m.cancelDiscoverers() for _, scfg := range cfg.ScrapeConfigs { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { - m.startProvider(ctx, scfg.JobName, provName, prov) + m.startProvider(ctx, poolKey{set: scfg.JobName, provider: provName}, prov) } } close(err) @@ -113,17 +120,17 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { return <-err } -func (m *Manager) startProvider(ctx context.Context, jobName, provName string, worker Discoverer) { +func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) { ctx, cancel := context.WithCancel(ctx) updates := make(chan []*config.TargetGroup) m.discoverCancel = append(m.discoverCancel, cancel) go worker.Run(ctx, updates) - go m.runProvider(ctx, provName, jobName, updates) + go m.runProvider(ctx, poolKey, updates) } -func (m *Manager) runProvider(ctx context.Context, provName, jobName string, updates chan []*config.TargetGroup) { +func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*config.TargetGroup) { for { select { case <-ctx.Done(): @@ -134,8 +141,8 @@ func (m *Manager) runProvider(ctx context.Context, provName, jobName string, upd if !ok { return } - m.addGroup(jobName, provName, tgs) - m.syncCh <- m.allGroups(jobName) + m.addGroup(poolKey, tgs) + m.syncCh <- m.allGroups(poolKey) } } } @@ -144,20 +151,16 @@ func (m *Manager) cancelDiscoverers() { for _, c := range m.discoverCancel { c() } - m.targets = make(map[string]map[string][]*config.TargetGroup) + m.targets = make(map[poolKey][]*config.TargetGroup) m.discoverCancel = nil } -func (m *Manager) addGroup(tsName, provName string, tg []*config.TargetGroup) { +func (m *Manager) addGroup(poolKey poolKey, tg []*config.TargetGroup) { done := make(chan struct{}) m.actionCh <- func(ctx context.Context) { - if m.targets[tsName] == nil { - m.targets[tsName] = make(map[string][]*config.TargetGroup) - } - if tg != nil { - m.targets[tsName][provName] = tg + m.targets[poolKey] = tg } close(done) @@ -165,31 +168,29 @@ func (m *Manager) addGroup(tsName, provName string, tg []*config.TargetGroup) { <-done } -func (m *Manager) allGroups(tsName string) map[string][]*config.TargetGroup { - tset := make(chan map[string][]*config.TargetGroup) +func (m *Manager) allGroups(pk poolKey) map[string][]*config.TargetGroup { + tSets := make(chan map[string][]*config.TargetGroup) m.actionCh <- func(ctx context.Context) { - tgAll := []*config.TargetGroup{} - // Sorting the providers is needed so that we can have predictable tests. - // Maps cannot be sorted so need to extract the keys to a slice and sort the string slice. - var providerNames []string - for providerName := range m.targets[tsName] { - providerNames = append(providerNames, providerName) + // Sorting by the poolKey is needed so that we can have predictable tests. + var pKeys []poolKey + for pk := range m.targets { + pKeys = append(pKeys, pk) } - sort.Strings(providerNames) - for _, prov := range providerNames { - for _, tg := range m.targets[tsName][prov] { + sort.Sort(byProvider(pKeys)) + + tSetsAll := map[string][]*config.TargetGroup{} + for _, pk := range pKeys { + for _, tg := range m.targets[pk] { if tg.Source != "" { // Don't add empty targets. - tgAll = append(tgAll, tg) + tSetsAll[pk.set] = append(tSetsAll[pk.set], tg) } } } - t := make(map[string][]*config.TargetGroup) - t[tsName] = tgAll - tset <- t + tSets <- tSetsAll } - return <-tset + return <-tSets } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 3e5b41f47..845fc8a87 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -590,7 +590,7 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { var totalUpdatesCount int for tpName, update := range testCase.updates { provider := newMockDiscoveryProvider(update) - discoveryManager.startProvider(ctx, strconv.Itoa(testIndex), tpName, provider) + discoveryManager.startProvider(ctx, poolKey{set: strconv.Itoa(testIndex), provider: tpName}, provider) if len(update) > 0 { totalUpdatesCount = totalUpdatesCount + len(update) @@ -627,19 +627,15 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { } func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { - verifyPresence := func(tSets map[string]map[string][]*config.TargetGroup, tSetName string, provName, label string, present bool) { - if _, ok := tSets[tSetName]; !ok { - t.Fatalf("'%s' should be present in TargetSets: %v", tSetName, tSets) - return - } - if _, ok := tSets[tSetName][provName]; !ok { - t.Fatalf("'%s' should be present in Discovery providers: %v", provName, tSets[tSetName]) + verifyPresence := func(tSets map[poolKey][]*config.TargetGroup, poolKey poolKey, label string, present bool) { + if _, ok := tSets[poolKey]; !ok { + t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) return } match := false var mergedTargets string - for _, targetGroup := range tSets[tSetName][provName] { + for _, targetGroup := range tSets[poolKey] { for _, l := range targetGroup.Targets { mergedTargets = mergedTargets + " " + l.String() @@ -678,8 +674,8 @@ scrape_configs: discoveryManager.ApplyConfig(cfg) _ = <-discoveryManager.SyncCh() - verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"foo:9090\"}", true) - verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"bar:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) sTwo := ` scrape_configs: @@ -693,8 +689,8 @@ scrape_configs: discoveryManager.ApplyConfig(cfg) _ = <-discoveryManager.SyncCh() - verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"foo:9090\"}", true) - verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"bar:9090\"}", false) + verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false) } type update struct {