discovery - handle Discoverers that send only target Group updates rather than all Targets on every update.

Signed-off-by: Krasi Georgiev <krasi.root@gmail.com>
This commit is contained in:
Krasi Georgiev 2018-01-04 13:14:22 +00:00
parent ec94df49d4
commit 7e28397a2c

View file

@ -73,7 +73,7 @@ func NewManager(logger log.Logger) *Manager {
logger: logger, logger: logger,
actionCh: make(chan func(context.Context)), actionCh: make(chan func(context.Context)),
syncCh: make(chan map[string][]*targetgroup.Group), syncCh: make(chan map[string][]*targetgroup.Group),
targets: make(map[poolKey][]*targetgroup.Group), targets: make(map[poolKey]map[string]*targetgroup.Group),
discoverCancel: []context.CancelFunc{}, discoverCancel: []context.CancelFunc{},
} }
} }
@ -83,7 +83,8 @@ type Manager struct {
logger log.Logger logger log.Logger
actionCh chan func(context.Context) actionCh chan func(context.Context)
discoverCancel []context.CancelFunc discoverCancel []context.CancelFunc
targets map[poolKey][]*targetgroup.Group // We use map[string]*targetgroup.Group to handle Discoverers that send only updates instead of all targets on every update.
targets map[poolKey]map[string]*targetgroup.Group
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
syncCh chan map[string][]*targetgroup.Group syncCh chan map[string][]*targetgroup.Group
} }
@ -143,8 +144,8 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan
if !ok { if !ok {
return return
} }
m.addGroup(poolKey, tgs) m.updateGroup(poolKey, tgs)
m.syncCh <- m.allGroups(poolKey) m.syncCh <- m.allGroups()
} }
} }
} }
@ -153,16 +154,21 @@ func (m *Manager) cancelDiscoverers() {
for _, c := range m.discoverCancel { for _, c := range m.discoverCancel {
c() c()
} }
m.targets = make(map[poolKey][]*targetgroup.Group) m.targets = make(map[poolKey]map[string]*targetgroup.Group)
m.discoverCancel = nil m.discoverCancel = nil
} }
func (m *Manager) addGroup(poolKey poolKey, tg []*targetgroup.Group) { func (m *Manager) updateGroup(poolKey poolKey, tg []*targetgroup.Group) {
done := make(chan struct{}) done := make(chan struct{})
m.actionCh <- func(ctx context.Context) { m.actionCh <- func(ctx context.Context) {
if tg != nil { if tg != nil {
m.targets[poolKey] = tg for _, t := range tg {
if _, ok := m.targets[poolKey]; !ok {
m.targets[poolKey] = make(map[string]*targetgroup.Group)
}
m.targets[poolKey][t.Source] = t
}
} }
close(done) close(done)
@ -170,7 +176,7 @@ func (m *Manager) addGroup(poolKey poolKey, tg []*targetgroup.Group) {
<-done <-done
} }
func (m *Manager) allGroups(pk poolKey) map[string][]*targetgroup.Group { func (m *Manager) allGroups() map[string][]*targetgroup.Group {
tSets := make(chan map[string][]*targetgroup.Group) tSets := make(chan map[string][]*targetgroup.Group)
m.actionCh <- func(ctx context.Context) { m.actionCh <- func(ctx context.Context) {
@ -185,7 +191,9 @@ func (m *Manager) allGroups(pk poolKey) map[string][]*targetgroup.Group {
tSetsAll := map[string][]*targetgroup.Group{} tSetsAll := map[string][]*targetgroup.Group{}
for _, pk := range pKeys { for _, pk := range pKeys {
for _, tg := range m.targets[pk] { for _, tg := range m.targets[pk] {
if tg.Source != "" { // Don't add empty targets. // Don't add empty targets.
// Some Discoverers(eg. k8s) send only the updates so removed targets will be updated with an empty Source value.
if tg.Source != "" {
tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg) tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg)
} }
} }