some renaming and comments fixes.

remove some select state that is most likely obsoleete and hoepfully doesn't braje anything :)
merge targets will sort by Discoverer name so we can have consistent tests for the maps.
This commit is contained in:
Krasi Georgiev 2017-11-29 22:38:52 +00:00
parent f5c2c5ff8f
commit fe6c544532

View file

@ -16,7 +16,7 @@ package discovery
import ( import (
"context" "context"
"fmt" "fmt"
"time" "sort"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
@ -64,7 +64,7 @@ func NewManager(ctx context.Context, logger log.Logger) *Manager {
logger: logger, logger: logger,
actionCh: make(chan func()), actionCh: make(chan func()),
syncCh: make(chan map[string][]*config.TargetGroup), syncCh: make(chan map[string][]*config.TargetGroup),
endpoints: make(map[string]map[string][]*config.TargetGroup), targets: make(map[string]map[string][]*config.TargetGroup),
discoverCancel: []context.CancelFunc{}, discoverCancel: []context.CancelFunc{},
} }
} }
@ -76,7 +76,7 @@ type Manager struct {
syncCh chan map[string][]*config.TargetGroup // map[targetSetName] syncCh chan map[string][]*config.TargetGroup // map[targetSetName]
actionCh chan func() actionCh chan func()
discoverCancel []context.CancelFunc discoverCancel []context.CancelFunc
endpoints map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName] targets map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName]
} }
// Run starts the background processing // Run starts the background processing
@ -86,12 +86,13 @@ func (m *Manager) Run() error {
case f := <-m.actionCh: case f := <-m.actionCh:
f() f()
case <-m.ctx.Done(): case <-m.ctx.Done():
m.cancelDiscoverers()
return m.ctx.Err() return m.ctx.Err()
} }
} }
} }
// SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates // SyncCh returns a read only channel used by all Discoverers to send target updates.
func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup { func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup {
return m.syncCh return m.syncCh
} }
@ -120,22 +121,6 @@ func (m *Manager) startProvider(jobName, provName string, worker Discoverer) {
go worker.Run(ctx, updates) go worker.Run(ctx, updates)
go func(provName string) { go func(provName string) {
select {
case <-ctx.Done():
// First set of all endpoints the provider knows.
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
break
}
m.syncCh <- m.mergeGroups(jobName, provName, tgs)
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
// Start listening for further updates.
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -156,28 +141,39 @@ func (m *Manager) cancelDiscoverers() {
for _, c := range m.discoverCancel { for _, c := range m.discoverCancel {
c() c()
} }
m.endpoints = make(map[string]map[string][]*config.TargetGroup) m.targets = make(map[string]map[string][]*config.TargetGroup)
m.discoverCancel = []context.CancelFunc{} m.discoverCancel = []context.CancelFunc{}
} }
// mergeGroups adds a new target group for a given discovery provider and returns all target groups for a given target set // mergeGroups adds a new target group for a given discovery provider and returns all target groups for a given target set
func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup {
if m.endpoints[tsName] == nil {
m.endpoints[tsName] = make(map[string][]*config.TargetGroup)
}
m.endpoints[tsName][provName] = []*config.TargetGroup{}
tset := make(chan map[string][]*config.TargetGroup) tset := make(chan map[string][]*config.TargetGroup)
m.actionCh <- func() { m.actionCh <- func() {
if tg != nil { if m.targets[tsName] == nil {
m.endpoints[tsName][provName] = tg m.targets[tsName] = make(map[string][]*config.TargetGroup)
} }
var tgAll []*config.TargetGroup m.targets[tsName][provName] = []*config.TargetGroup{}
for _, prov := range m.endpoints[tsName] {
for _, tg := range prov { if tg != nil {
m.targets[tsName][provName] = tg
}
tgAll := []*config.TargetGroup{}
// Sort the providers alphabetically.
// Maps cannot be sorted so need to extract the keys to a slice and sort the string slice.
var providerNames []string
for providerName := range m.targets[tsName] {
providerNames = append(providerNames, providerName)
}
sort.Strings(providerNames)
for _, prov := range providerNames {
for _, tg := range m.targets[tsName][prov] {
if tg.Source != "" { // Don't add empty targets.
tgAll = append(tgAll, tg) tgAll = append(tgAll, tg)
} }
} }
}
t := make(map[string][]*config.TargetGroup) t := make(map[string][]*config.TargetGroup)
t[tsName] = tgAll t[tsName] = tgAll
tset <- t tset <- t