Merge pull request #977 from prometheus/fabxc/target-dedup

Improve target discovery pipeline
This commit is contained in:
Fabian Reinartz 2015-08-14 16:38:16 +02:00
commit 9b9ff66212
11 changed files with 276 additions and 221 deletions

View file

@ -57,9 +57,8 @@ type ConsulDiscovery struct {
tagSeparator string tagSeparator string
scrapedServices map[string]struct{} scrapedServices map[string]struct{}
mu sync.RWMutex mu sync.RWMutex
services map[string]*consulService services map[string]*consulService
runDone, srvsDone chan struct{}
} }
// consulService contains data belonging to the same service. // consulService contains data belonging to the same service.
@ -93,8 +92,6 @@ func NewConsulDiscovery(conf *config.ConsulSDConfig) *ConsulDiscovery {
client: client, client: client,
clientConf: clientConf, clientConf: clientConf,
tagSeparator: conf.TagSeparator, tagSeparator: conf.TagSeparator,
runDone: make(chan struct{}),
srvsDone: make(chan struct{}, 1),
scrapedServices: map[string]struct{}{}, scrapedServices: map[string]struct{}{},
services: map[string]*consulService{}, services: map[string]*consulService{},
} }
@ -133,18 +130,22 @@ func (cd *ConsulDiscovery) Sources() []string {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) { func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
defer cd.stop()
update := make(chan *consulService, 10) update := make(chan *consulService, 10)
go cd.watchServices(update) go cd.watchServices(update, done)
for { for {
select { select {
case <-cd.runDone: case <-done:
return return
case srv := <-update: case srv := <-update:
if srv.removed { if srv.removed {
close(srv.done)
// Send clearing update.
ch <- &config.TargetGroup{Source: srv.name} ch <- &config.TargetGroup{Source: srv.name}
break break
} }
@ -157,31 +158,20 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) {
} }
} }
// Stop implements the TargetProvider interface. func (cd *ConsulDiscovery) stop() {
func (cd *ConsulDiscovery) Stop() {
log.Debugf("Stopping Consul service discovery for %s", cd.clientConf.Address)
// The lock prevents Run from terminating while the watchers attempt // The lock prevents Run from terminating while the watchers attempt
// to send on their channels. // to send on their channels.
cd.mu.Lock() cd.mu.Lock()
defer cd.mu.Unlock() defer cd.mu.Unlock()
// The watching goroutines will terminate after their next watch timeout.
// As this can take long, the channel is buffered and we do not wait.
for _, srv := range cd.services { for _, srv := range cd.services {
srv.done <- struct{}{} close(srv.done)
} }
cd.srvsDone <- struct{}{}
// Terminate Run.
cd.runDone <- struct{}{}
log.Debugf("Consul service discovery for %s stopped.", cd.clientConf.Address)
} }
// watchServices retrieves updates from Consul's services endpoint and sends // watchServices retrieves updates from Consul's services endpoint and sends
// potential updates to the update channel. // potential updates to the update channel.
func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) { func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-chan struct{}) {
var lastIndex uint64 var lastIndex uint64
for { for {
catalog := cd.client.Catalog() catalog := cd.client.Catalog()
@ -191,8 +181,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
}) })
if err != nil { if err != nil {
log.Errorf("Error refreshing service list: %s", err) log.Errorf("Error refreshing service list: %s", err)
<-time.After(consulRetryInterval) time.Sleep(consulRetryInterval)
continue
} }
// If the index equals the previous one, the watch timed out with no update. // If the index equals the previous one, the watch timed out with no update.
if meta.LastIndex == lastIndex { if meta.LastIndex == lastIndex {
@ -202,7 +191,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
cd.mu.Lock() cd.mu.Lock()
select { select {
case <-cd.srvsDone: case <-done:
cd.mu.Unlock() cd.mu.Unlock()
return return
default: default:
@ -218,7 +207,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
srv = &consulService{ srv = &consulService{
name: name, name: name,
tgroup: &config.TargetGroup{}, tgroup: &config.TargetGroup{},
done: make(chan struct{}, 1), done: make(chan struct{}),
} }
srv.tgroup.Source = name srv.tgroup.Source = name
cd.services[name] = srv cd.services[name] = srv
@ -234,7 +223,6 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
if _, ok := srvs[name]; !ok { if _, ok := srvs[name]; !ok {
srv.removed = true srv.removed = true
update <- srv update <- srv
srv.done <- struct{}{}
delete(cd.services, name) delete(cd.services, name)
} }
} }
@ -253,7 +241,7 @@ func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.Ta
}) })
if err != nil { if err != nil {
log.Errorf("Error refreshing service %s: %s", srv.name, err) log.Errorf("Error refreshing service %s: %s", srv.name, err)
<-time.After(consulRetryInterval) time.Sleep(consulRetryInterval)
continue continue
} }
// If the index equals the previous one, the watch timed out with no update. // If the index equals the previous one, the watch timed out with no update.

View file

@ -64,11 +64,11 @@ func init() {
type DNSDiscovery struct { type DNSDiscovery struct {
names []string names []string
done chan struct{} done chan struct{}
ticker *time.Ticker interval time.Duration
m sync.RWMutex m sync.RWMutex
port int port int
qtype uint16 qtype uint16
} }
// NewDNSDiscovery returns a new DNSDiscovery which periodically refreshes its targets. // NewDNSDiscovery returns a new DNSDiscovery which periodically refreshes its targets.
@ -83,41 +83,34 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery {
qtype = dns.TypeSRV qtype = dns.TypeSRV
} }
return &DNSDiscovery{ return &DNSDiscovery{
names: conf.Names, names: conf.Names,
done: make(chan struct{}), done: make(chan struct{}),
ticker: time.NewTicker(time.Duration(conf.RefreshInterval)), interval: time.Duration(conf.RefreshInterval),
qtype: qtype, qtype: qtype,
port: conf.Port, port: conf.Port,
} }
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup) { func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
ticker := time.NewTicker(dd.interval)
defer ticker.Stop()
// Get an initial set right away. // Get an initial set right away.
dd.refreshAll(ch) dd.refreshAll(ch)
for { for {
select { select {
case <-dd.ticker.C: case <-ticker.C:
dd.refreshAll(ch) dd.refreshAll(ch)
case <-dd.done: case <-done:
return return
} }
} }
} }
// Stop implements the TargetProvider interface.
func (dd *DNSDiscovery) Stop() {
log.Debug("Stopping DNS discovery for %s...", dd.names)
dd.ticker.Stop()
dd.done <- struct{}{}
log.Debug("DNS discovery for %s stopped.", dd.names)
}
// Sources implements the TargetProvider interface. // Sources implements the TargetProvider interface.
func (dd *DNSDiscovery) Sources() []string { func (dd *DNSDiscovery) Sources() []string {
var srcs []string var srcs []string

View file

@ -39,7 +39,6 @@ type FileDiscovery struct {
paths []string paths []string
watcher *fsnotify.Watcher watcher *fsnotify.Watcher
interval time.Duration interval time.Duration
done chan struct{}
// lastRefresh stores which files were found during the last refresh // lastRefresh stores which files were found during the last refresh
// and how many target groups they contained. // and how many target groups they contained.
@ -52,7 +51,6 @@ func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery {
return &FileDiscovery{ return &FileDiscovery{
paths: conf.Names, paths: conf.Names,
interval: time.Duration(conf.RefreshInterval), interval: time.Duration(conf.RefreshInterval),
done: make(chan struct{}),
} }
} }
@ -106,8 +104,9 @@ func (fd *FileDiscovery) watchFiles() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) { func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
defer fd.stop()
watcher, err := fsnotify.NewWatcher() watcher, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
@ -125,10 +124,13 @@ func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) {
// Stopping has priority over refreshing. Thus we wrap the actual select // Stopping has priority over refreshing. Thus we wrap the actual select
// clause to always catch done signals. // clause to always catch done signals.
select { select {
case <-fd.done: case <-done:
return return
default: default:
select { select {
case <-done:
return
case event := <-fd.watcher.Events: case event := <-fd.watcher.Events:
// fsnotify sometimes sends a bunch of events without name or operation. // fsnotify sometimes sends a bunch of events without name or operation.
// It's unclear what they are and why they are sent - filter them out. // It's unclear what they are and why they are sent - filter them out.
@ -154,9 +156,6 @@ func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) {
if err != nil { if err != nil {
log.Errorf("Error on file watch: %s", err) log.Errorf("Error on file watch: %s", err)
} }
case <-fd.done:
return
} }
} }
} }
@ -198,11 +197,10 @@ func fileSource(filename string, i int) string {
return fmt.Sprintf("%s:%d", filename, i) return fmt.Sprintf("%s:%d", filename, i)
} }
// Stop implements the TargetProvider interface. // stop shuts down the file watcher.
func (fd *FileDiscovery) Stop() { func (fd *FileDiscovery) stop() {
log.Debugf("Stopping file discovery for %s...", fd.paths) log.Debugf("Stopping file discovery for %s...", fd.paths)
fd.done <- struct{}{}
// Closing the watcher will deadlock unless all events and errors are drained. // Closing the watcher will deadlock unless all events and errors are drained.
go func() { go func() {
for { for {
@ -210,15 +208,13 @@ func (fd *FileDiscovery) Stop() {
case <-fd.watcher.Errors: case <-fd.watcher.Errors:
case <-fd.watcher.Events: case <-fd.watcher.Events:
// Drain all events and errors. // Drain all events and errors.
case <-fd.done: default:
return return
} }
} }
}() }()
fd.watcher.Close() fd.watcher.Close()
fd.done <- struct{}{}
log.Debugf("File discovery for %s stopped.", fd.paths) log.Debugf("File discovery for %s stopped.", fd.paths)
} }

View file

@ -24,11 +24,13 @@ func testFileSD(t *testing.T, ext string) {
conf.Names = []string{"fixtures/_*" + ext} conf.Names = []string{"fixtures/_*" + ext}
conf.RefreshInterval = config.Duration(1 * time.Hour) conf.RefreshInterval = config.Duration(1 * time.Hour)
fsd := NewFileDiscovery(&conf) var (
fsd = NewFileDiscovery(&conf)
ch := make(chan *config.TargetGroup) ch = make(chan *config.TargetGroup)
go fsd.Run(ch) done = make(chan struct{})
defer fsd.Stop() )
go fsd.Run(ch, done)
defer close(done)
select { select {
case <-time.After(25 * time.Millisecond): case <-time.After(25 * time.Millisecond):

View file

@ -160,64 +160,62 @@ func (kd *KubernetesDiscovery) Sources() []string {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (kd *KubernetesDiscovery) Run(ch chan<- *config.TargetGroup) { func (kd *KubernetesDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
kd.updateNodesTargetGroup(ch) select {
case ch <- kd.updateNodesTargetGroup():
case <-done:
return
}
for _, ns := range kd.services { for _, ns := range kd.services {
for _, service := range ns { for _, service := range ns {
kd.addService(service, ch) select {
case ch <- kd.addService(service):
case <-done:
return
}
} }
} }
retryInterval := time.Duration(kd.Conf.RetryInterval) retryInterval := time.Duration(kd.Conf.RetryInterval)
update := make(chan interface{}, 10) update := make(chan interface{}, 10)
defer close(update)
go kd.watchNodes(update, retryInterval) go kd.watchNodes(update, done, retryInterval)
go kd.watchServices(update, retryInterval) go kd.watchServices(update, done, retryInterval)
go kd.watchServiceEndpoints(update, retryInterval) go kd.watchServiceEndpoints(update, done, retryInterval)
var tg *config.TargetGroup
for { for {
select { select {
case <-kd.runDone: case <-done:
return return
case event := <-update: case event := <-update:
switch obj := event.(type) { switch obj := event.(type) {
case *nodeEvent: case *nodeEvent:
kd.updateNode(obj.Node, obj.EventType) kd.updateNode(obj.Node, obj.EventType)
kd.updateNodesTargetGroup(ch) tg = kd.updateNodesTargetGroup()
case *serviceEvent: case *serviceEvent:
kd.updateService(obj.Service, obj.EventType, ch) tg = kd.updateService(obj.Service, obj.EventType)
case *endpointsEvent: case *endpointsEvent:
kd.updateServiceEndpoints(obj.Endpoints, obj.EventType, ch) tg = kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)
} }
} }
select {
case ch <- tg:
case <-done:
return
}
} }
} }
// Stop implements the TargetProvider interface. func (kd *KubernetesDiscovery) updateNodesTargetGroup() *config.TargetGroup {
func (kd *KubernetesDiscovery) Stop() {
log.Debugf("Stopping Kubernetes discovery for %s", kd.Conf.Server)
// The lock prevents Run from terminating while the watchers attempt
// to send on their channels.
kd.nodesMu.Lock() kd.nodesMu.Lock()
defer kd.nodesMu.Unlock() defer kd.nodesMu.Unlock()
kd.servicesMu.Lock()
defer kd.servicesMu.Unlock()
// Terminate Run.
kd.runDone <- struct{}{}
log.Debugf("Kubernetes discovery for %s stopped.", kd.Conf.Server)
}
func (kd *KubernetesDiscovery) updateNodesTargetGroup(ch chan<- *config.TargetGroup) {
kd.nodesMu.Lock()
defer kd.nodesMu.Unlock()
tg := &config.TargetGroup{Source: nodesTargetGroupName} tg := &config.TargetGroup{Source: nodesTargetGroupName}
// Now let's loop through the nodes & add them to the target group with appropriate labels. // Now let's loop through the nodes & add them to the target group with appropriate labels.
@ -235,7 +233,7 @@ func (kd *KubernetesDiscovery) updateNodesTargetGroup(ch chan<- *config.TargetGr
tg.Targets = append(tg.Targets, t) tg.Targets = append(tg.Targets, t)
} }
ch <- tg return tg
} }
func (kd *KubernetesDiscovery) updateNode(node *Node, eventType EventType) { func (kd *KubernetesDiscovery) updateNode(node *Node, eventType EventType) {
@ -253,7 +251,7 @@ func (kd *KubernetesDiscovery) updateNode(node *Node, eventType EventType) {
} }
// watchNodes watches nodes as they come & go. // watchNodes watches nodes as they come & go.
func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, retryInterval time.Duration) { func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() { until(func() {
req, err := http.NewRequest("GET", kd.Conf.Server+nodesURL, nil) req, err := http.NewRequest("GET", kd.Conf.Server+nodesURL, nil)
if err != nil { if err != nil {
@ -283,13 +281,17 @@ func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, retryInterval
return return
} }
kd.nodesResourceVersion = event.Node.ObjectMeta.ResourceVersion kd.nodesResourceVersion = event.Node.ObjectMeta.ResourceVersion
events <- &event
select {
case events <- &event:
case <-done:
}
} }
}, retryInterval, kd.runDone) }, retryInterval, done)
} }
// watchServices watches services as they come & go. // watchServices watches services as they come & go.
func (kd *KubernetesDiscovery) watchServices(events chan interface{}, retryInterval time.Duration) { func (kd *KubernetesDiscovery) watchServices(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() { until(func() {
req, err := http.NewRequest("GET", kd.Conf.Server+servicesURL, nil) req, err := http.NewRequest("GET", kd.Conf.Server+servicesURL, nil)
if err != nil { if err != nil {
@ -320,64 +322,77 @@ func (kd *KubernetesDiscovery) watchServices(events chan interface{}, retryInter
return return
} }
kd.servicesResourceVersion = event.Service.ObjectMeta.ResourceVersion kd.servicesResourceVersion = event.Service.ObjectMeta.ResourceVersion
events <- &event
select {
case events <- &event:
case <-done:
}
} }
}, retryInterval, kd.runDone) }, retryInterval, done)
} }
func (kd *KubernetesDiscovery) updateService(service *Service, eventType EventType, ch chan<- *config.TargetGroup) { func (kd *KubernetesDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup {
kd.servicesMu.Lock() kd.servicesMu.Lock()
defer kd.servicesMu.Unlock() defer kd.servicesMu.Unlock()
name := service.ObjectMeta.Name
namespace := service.ObjectMeta.Namespace var (
_, ok := kd.services[namespace][name] name = service.ObjectMeta.Name
namespace = service.ObjectMeta.Namespace
_, exists = kd.services[namespace][name]
)
switch eventType { switch eventType {
case deleted: case deleted:
if ok { if exists {
kd.deleteService(service, ch) return kd.deleteService(service)
} }
case added, modified: case added, modified:
kd.addService(service, ch) return kd.addService(service)
} }
return nil
} }
func (kd *KubernetesDiscovery) deleteService(service *Service, ch chan<- *config.TargetGroup) { func (kd *KubernetesDiscovery) deleteService(service *Service) *config.TargetGroup {
tg := &config.TargetGroup{Source: serviceSource(service)} tg := &config.TargetGroup{Source: serviceSource(service)}
ch <- tg
delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name) delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name)
if len(kd.services[service.ObjectMeta.Namespace]) == 0 { if len(kd.services[service.ObjectMeta.Namespace]) == 0 {
delete(kd.services, service.ObjectMeta.Namespace) delete(kd.services, service.ObjectMeta.Namespace)
} }
return tg
} }
func (kd *KubernetesDiscovery) addService(service *Service, ch chan<- *config.TargetGroup) { func (kd *KubernetesDiscovery) addService(service *Service) *config.TargetGroup {
namespace, ok := kd.services[service.ObjectMeta.Namespace] namespace, ok := kd.services[service.ObjectMeta.Namespace]
if !ok { if !ok {
namespace = map[string]*Service{} namespace = map[string]*Service{}
kd.services[service.ObjectMeta.Namespace] = namespace kd.services[service.ObjectMeta.Namespace] = namespace
} }
namespace[service.ObjectMeta.Name] = service namespace[service.ObjectMeta.Name] = service
endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name)
res, err := kd.client.Get(kd.Conf.Server + endpointURL) res, err := kd.client.Get(kd.Conf.Server + endpointURL)
if err != nil { if err != nil {
log.Errorf("Error getting service endpoints: %s", err) log.Errorf("Error getting service endpoints: %s", err)
return return nil
} }
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
log.Errorf("Failed to get service endpoints: %s", res.StatusCode) log.Errorf("Failed to get service endpoints: %s", res.StatusCode)
return return nil
} }
var endpoints Endpoints var endpoints Endpoints
if err := json.NewDecoder(res.Body).Decode(&endpoints); err != nil { if err := json.NewDecoder(res.Body).Decode(&endpoints); err != nil {
log.Errorf("Error getting service endpoints: %s", err) log.Errorf("Error getting service endpoints: %s", err)
return return nil
} }
kd.updateServiceTargetGroup(service, &endpoints, ch) return kd.updateServiceTargetGroup(service, &endpoints)
} }
func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoints *Endpoints, ch chan<- *config.TargetGroup) { func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoints *Endpoints) *config.TargetGroup {
tg := &config.TargetGroup{ tg := &config.TargetGroup{
Source: serviceSource(service), Source: serviceSource(service),
Labels: clientmodel.LabelSet{ Labels: clientmodel.LabelSet{
@ -413,11 +428,11 @@ func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoi
} }
} }
ch <- tg return tg
} }
// watchServiceEndpoints watches service endpoints as they come & go. // watchServiceEndpoints watches service endpoints as they come & go.
func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, retryInterval time.Duration) { func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() { until(func() {
req, err := http.NewRequest("GET", kd.Conf.Server+endpointsURL, nil) req, err := http.NewRequest("GET", kd.Conf.Server+endpointsURL, nil)
if err != nil { if err != nil {
@ -448,19 +463,26 @@ func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, re
return return
} }
kd.servicesResourceVersion = event.Endpoints.ObjectMeta.ResourceVersion kd.servicesResourceVersion = event.Endpoints.ObjectMeta.ResourceVersion
events <- &event
select {
case events <- &event:
case <-done:
}
} }
}, retryInterval, kd.runDone) }, retryInterval, done)
} }
func (kd *KubernetesDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType, ch chan<- *config.TargetGroup) { func (kd *KubernetesDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup {
kd.servicesMu.Lock() kd.servicesMu.Lock()
defer kd.servicesMu.Unlock() defer kd.servicesMu.Unlock()
serviceNamespace := endpoints.ObjectMeta.Namespace serviceNamespace := endpoints.ObjectMeta.Namespace
serviceName := endpoints.ObjectMeta.Name serviceName := endpoints.ObjectMeta.Name
if service, ok := kd.services[serviceNamespace][serviceName]; ok { if service, ok := kd.services[serviceNamespace][serviceName]; ok {
kd.updateServiceTargetGroup(service, endpoints, ch) return kd.updateServiceTargetGroup(service, endpoints)
} }
return nil
} }
func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) { func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) {

View file

@ -40,13 +40,12 @@ func (md *MarathonDiscovery) Sources() []string {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup) { func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
for { for {
select { select {
case <-md.done: case <-done:
log.Debug("Shutting down marathon discovery.")
return return
case <-time.After(md.refreshInterval): case <-time.After(md.refreshInterval):
err := md.updateServices(ch) err := md.updateServices(ch)
@ -57,11 +56,6 @@ func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup) {
} }
} }
// Stop implements the TargetProvider interface.
func (md *MarathonDiscovery) Stop() {
md.done <- struct{}{}
}
func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error { func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups() targetMap, err := md.fetchTargetGroups()
if err != nil { if err != nil {

View file

@ -147,17 +147,18 @@ func TestMarathonSDRunAndStop(t *testing.T) {
return marathonTestAppList(marathonValidLabel, 1), nil return marathonTestAppList(marathonValidLabel, 1), nil
}) })
md.refreshInterval = time.Millisecond * 10 md.refreshInterval = time.Millisecond * 10
done := make(chan struct{})
go func() { go func() {
select { select {
case <-ch: case <-ch:
md.Stop() close(done)
case <-time.After(md.refreshInterval * 3): case <-time.After(md.refreshInterval * 3):
md.Stop() close(done)
t.Fatalf("Update took too long.") t.Fatalf("Update took too long.")
} }
}() }()
md.Run(ch) md.Run(ch, done)
select { select {
case <-ch: case <-ch:
default: default:

View file

@ -67,7 +67,6 @@ type ServersetDiscovery struct {
sources map[string]*config.TargetGroup sources map[string]*config.TargetGroup
sdUpdates *chan<- *config.TargetGroup sdUpdates *chan<- *config.TargetGroup
updates chan zookeeperTreeCacheEvent updates chan zookeeperTreeCacheEvent
runDone chan struct{}
treeCache *zookeeperTreeCache treeCache *zookeeperTreeCache
} }
@ -84,7 +83,6 @@ func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery {
conn: conn, conn: conn,
updates: updates, updates: updates,
sources: map[string]*config.TargetGroup{}, sources: map[string]*config.TargetGroup{},
runDone: make(chan struct{}),
} }
go sd.processUpdates() go sd.processUpdates()
sd.treeCache = NewZookeeperTreeCache(conn, conf.Paths[0], updates) sd.treeCache = NewZookeeperTreeCache(conn, conf.Paths[0], updates)
@ -132,7 +130,7 @@ func (sd *ServersetDiscovery) processUpdates() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup) { func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
// Send on everything we have seen so far. // Send on everything we have seen so far.
sd.mu.Lock() sd.mu.Lock()
for _, targetGroup := range sd.sources { for _, targetGroup := range sd.sources {
@ -142,20 +140,10 @@ func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup) {
sd.sdUpdates = &ch sd.sdUpdates = &ch
sd.mu.Unlock() sd.mu.Unlock()
<-sd.runDone <-done
sd.treeCache.Stop() sd.treeCache.Stop()
} }
// Stop implements the TargetProvider interface.
func (sd *ServersetDiscovery) Stop() {
log.Debugf("Stopping serverset service discovery for %s %s", sd.conf.Servers, sd.conf.Paths)
// Terminate Run.
sd.runDone <- struct{}{}
log.Debugf("Serverset service discovery for %s %s stopped", sd.conf.Servers, sd.conf.Paths)
}
func parseServersetMember(data []byte, path string) (*clientmodel.LabelSet, error) { func parseServersetMember(data []byte, path string) (*clientmodel.LabelSet, error) {
member := serversetMember{} member := serversetMember{}
err := json.Unmarshal(data, &member) err := json.Unmarshal(data, &member)

View file

@ -53,17 +53,18 @@ type fakeTargetProvider struct {
update chan *config.TargetGroup update chan *config.TargetGroup
} }
func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup) { func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
for tg := range tp.update { for {
ch <- tg select {
case tg := <-tp.update:
ch <- tg
case <-done:
return
}
} }
} }
func (tp *fakeTargetProvider) Stop() {
close(tp.update)
}
func (tp *fakeTargetProvider) Sources() []string { func (tp *fakeTargetProvider) Sources() []string {
return tp.sources return tp.sources
} }

View file

@ -43,20 +43,19 @@ type TargetProvider interface {
// Run hands a channel to the target provider through which it can send // Run hands a channel to the target provider through which it can send
// updated target groups. The channel must be closed by the target provider // updated target groups. The channel must be closed by the target provider
// if no more updates will be sent. // if no more updates will be sent.
Run(chan<- *config.TargetGroup) // On receiving from done Run must return.
// Stop terminates any potential computation of the target provider. The Run(up chan<- *config.TargetGroup, done <-chan struct{})
// channel received on Run must be closed afterwards.
Stop()
} }
// TargetManager maintains a set of targets, starts and stops their scraping and // TargetManager maintains a set of targets, starts and stops their scraping and
// creates the new targets based on the target groups it receives from various // creates the new targets based on the target groups it receives from various
// target providers. // target providers.
type TargetManager struct { type TargetManager struct {
m sync.RWMutex mtx sync.RWMutex
globalLabels clientmodel.LabelSet globalLabels clientmodel.LabelSet
sampleAppender storage.SampleAppender sampleAppender storage.SampleAppender
running bool running bool
done chan struct{}
// Targets by their source ID. // Targets by their source ID.
targets map[string][]*Target targets map[string][]*Target
@ -73,31 +72,96 @@ func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager {
return tm return tm
} }
// merge multiple target group channels into a single output channel.
func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGroupUpdate {
var wg sync.WaitGroup
out := make(chan targetGroupUpdate)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
redir := func(c <-chan targetGroupUpdate) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go redir(c)
}
// Close the out channel if all inbound channels are closed.
go func() {
wg.Wait()
close(out)
}()
return out
}
// targetGroupUpdate is a potentially changed/new target group
// for the given scrape configuration.
type targetGroupUpdate struct {
tg *config.TargetGroup
scfg *config.ScrapeConfig
}
// Run starts background processing to handle target updates. // Run starts background processing to handle target updates.
func (tm *TargetManager) Run() { func (tm *TargetManager) Run() {
log.Info("Starting target manager...") log.Info("Starting target manager...")
tm.done = make(chan struct{})
sources := map[string]struct{}{} sources := map[string]struct{}{}
updates := []<-chan targetGroupUpdate{}
for scfg, provs := range tm.providers { for scfg, provs := range tm.providers {
for _, prov := range provs { for _, prov := range provs {
ch := make(chan *config.TargetGroup) // Get an initial set of available sources so we don't remove
go tm.handleTargetUpdates(scfg, ch) // target groups from the last run that are still available.
for _, src := range prov.Sources() { for _, src := range prov.Sources() {
sources[src] = struct{}{} sources[src] = struct{}{}
} }
tgc := make(chan *config.TargetGroup)
// Run the target provider after cleanup of the stale targets is done. // Run the target provider after cleanup of the stale targets is done.
defer func(p TargetProvider, c chan *config.TargetGroup) { defer func(prov TargetProvider, tgc chan *config.TargetGroup) {
go p.Run(c) go prov.Run(tgc, tm.done)
}(prov, ch) }(prov, tgc)
tgupc := make(chan targetGroupUpdate)
updates = append(updates, tgupc)
go func(scfg *config.ScrapeConfig) {
defer close(tgupc)
for {
select {
case tg := <-tgc:
if tg == nil {
break
}
tgupc <- targetGroupUpdate{tg: tg, scfg: scfg}
case <-tm.done:
return
}
}
}(scfg)
} }
} }
tm.m.Lock() // Merge all channels of incoming target group updates into a single
defer tm.m.Unlock() // one and keep applying the updates.
go tm.handleUpdates(merge(tm.done, updates...), tm.done)
tm.mtx.Lock()
defer tm.mtx.Unlock()
// Remove old target groups that are no longer in the set of sources.
tm.removeTargets(func(src string) bool { tm.removeTargets(func(src string) bool {
if _, ok := sources[src]; ok { if _, ok := sources[src]; ok {
return false return false
@ -110,24 +174,32 @@ func (tm *TargetManager) Run() {
// handleTargetUpdates receives target group updates and handles them in the // handleTargetUpdates receives target group updates and handles them in the
// context of the given job config. // context of the given job config.
func (tm *TargetManager) handleTargetUpdates(cfg *config.ScrapeConfig, ch <-chan *config.TargetGroup) { func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan struct{}) {
for tg := range ch { for {
log.Debugf("Received potential update for target group %q", tg.Source) select {
case update := <-ch:
if update.tg == nil {
break
}
log.Debugf("Received potential update for target group %q", update.tg.Source)
if err := tm.updateTargetGroup(tg, cfg); err != nil { if err := tm.updateTargetGroup(update.tg, update.scfg); err != nil {
log.Errorf("Error updating targets: %s", err) log.Errorf("Error updating targets: %s", err)
}
case <-done:
return
} }
} }
} }
// Stop all background processing. // Stop all background processing.
func (tm *TargetManager) Stop() { func (tm *TargetManager) Stop() {
tm.m.RLock() tm.mtx.RLock()
if tm.running { if tm.running {
defer tm.stop(true) defer tm.stop(true)
} }
// Return the lock before calling tm.stop(). // Return the lock before calling tm.stop().
defer tm.m.RUnlock() defer tm.mtx.RUnlock()
} }
// stop background processing of the target manager. If removeTargets is true, // stop background processing of the target manager. If removeTargets is true,
@ -136,25 +208,10 @@ func (tm *TargetManager) stop(removeTargets bool) {
log.Info("Stopping target manager...") log.Info("Stopping target manager...")
defer log.Info("Target manager stopped.") defer log.Info("Target manager stopped.")
tm.m.Lock() close(tm.done)
provs := []TargetProvider{}
for _, ps := range tm.providers {
provs = append(provs, ps...)
}
tm.m.Unlock()
var wg sync.WaitGroup tm.mtx.Lock()
wg.Add(len(provs)) defer tm.mtx.Unlock()
for _, prov := range provs {
go func(p TargetProvider) {
p.Stop()
wg.Done()
}(prov)
}
wg.Wait()
tm.m.Lock()
defer tm.m.Unlock()
if removeTargets { if removeTargets {
tm.removeTargets(nil) tm.removeTargets(nil)
@ -194,8 +251,8 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
return err return err
} }
tm.m.Lock() tm.mtx.Lock()
defer tm.m.Unlock() defer tm.mtx.Unlock()
if !tm.running { if !tm.running {
return nil return nil
@ -261,8 +318,8 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
// Pools returns the targets currently being scraped bucketed by their job name. // Pools returns the targets currently being scraped bucketed by their job name.
func (tm *TargetManager) Pools() map[string][]*Target { func (tm *TargetManager) Pools() map[string][]*Target {
tm.m.RLock() tm.mtx.RLock()
defer tm.m.RUnlock() defer tm.mtx.RUnlock()
pools := map[string][]*Target{} pools := map[string][]*Target{}
@ -279,9 +336,9 @@ func (tm *TargetManager) Pools() map[string][]*Target {
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged. // by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
// Returns true on success. // Returns true on success.
func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
tm.m.RLock() tm.mtx.RLock()
running := tm.running running := tm.running
tm.m.RUnlock() tm.mtx.RUnlock()
if running { if running {
tm.stop(false) tm.stop(false)
@ -294,8 +351,8 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
providers[scfg] = providersFromConfig(scfg) providers[scfg] = providersFromConfig(scfg)
} }
tm.m.Lock() tm.mtx.Lock()
defer tm.m.Unlock() defer tm.mtx.Unlock()
tm.globalLabels = cfg.GlobalConfig.Labels tm.globalLabels = cfg.GlobalConfig.Labels
tm.providers = providers tm.providers = providers
@ -325,15 +382,23 @@ func (tp *prefixedTargetProvider) Sources() []string {
return srcs return srcs
} }
func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup) { func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
ch2 := make(chan *config.TargetGroup) ch2 := make(chan *config.TargetGroup)
go tp.TargetProvider.Run(ch2) go tp.TargetProvider.Run(ch2, done)
for tg := range ch2 { for {
tg.Source = tp.prefix(tg.Source) select {
ch <- tg case <-done:
return
case tg := <-ch2:
if tg == nil {
break
}
tg.Source = tp.prefix(tg.Source)
ch <- tg
}
} }
} }
@ -382,8 +447,8 @@ func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
// targetsFromGroup builds targets based on the given TargetGroup and config. // targetsFromGroup builds targets based on the given TargetGroup and config.
func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
tm.m.RLock() tm.mtx.RLock()
defer tm.m.RUnlock() defer tm.mtx.RUnlock()
targets := make([]*Target, 0, len(tg.Targets)) targets := make([]*Target, 0, len(tg.Targets))
for i, labels := range tg.Targets { for i, labels := range tg.Targets {
@ -470,15 +535,17 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup) { func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
for _, tg := range sd.TargetGroups { defer close(ch)
ch <- tg
}
close(ch) // This provider never sends any updates.
}
// Stop implements the TargetProvider interface. for _, tg := range sd.TargetGroups {
func (sd *StaticProvider) Stop() {} select {
case <-done:
return
case ch <- tg:
}
}
}
// TargetGroups returns the provider's target groups. // TargetGroups returns the provider's target groups.
func (sd *StaticProvider) Sources() (srcs []string) { func (sd *StaticProvider) Sources() (srcs []string) {

View file

@ -54,7 +54,10 @@ func TestPrefixedTargetProvider(t *testing.T) {
} }
ch := make(chan *config.TargetGroup) ch := make(chan *config.TargetGroup)
go tp.Run(ch) done := make(chan struct{})
defer close(done)
go tp.Run(ch, done)
expGroup1 := *targetGroups[0] expGroup1 := *targetGroups[0]
expGroup2 := *targetGroups[1] expGroup2 := *targetGroups[1]
@ -347,10 +350,10 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
conf.ScrapeConfigs = step.scrapeConfigs conf.ScrapeConfigs = step.scrapeConfigs
targetManager.ApplyConfig(conf) targetManager.ApplyConfig(conf)
<-time.After(1 * time.Millisecond) time.Sleep(50 * time.Millisecond)
if len(targetManager.targets) != len(step.expected) { if len(targetManager.targets) != len(step.expected) {
t.Fatalf("step %d: sources mismatch %v, %v", targetManager.targets, step.expected) t.Fatalf("step %d: sources mismatch: expected %v, got %v", i, step.expected, targetManager.targets)
} }
for source, actTargets := range targetManager.targets { for source, actTargets := range targetManager.targets {