diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index ca5e9c5f4..c9a0f49a7 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -160,64 +160,62 @@ func (kd *KubernetesDiscovery) Sources() []string { } // Run implements the TargetProvider interface. -func (kd *KubernetesDiscovery) Run(ch chan<- *config.TargetGroup) { +func (kd *KubernetesDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { defer close(ch) - kd.updateNodesTargetGroup(ch) + select { + case ch <- kd.updateNodesTargetGroup(): + case <-done: + return + } for _, ns := range kd.services { for _, service := range ns { - kd.addService(service, ch) + select { + case ch <- kd.addService(service): + case <-done: + return + } } } retryInterval := time.Duration(kd.Conf.RetryInterval) update := make(chan interface{}, 10) - defer close(update) - go kd.watchNodes(update, retryInterval) - go kd.watchServices(update, retryInterval) - go kd.watchServiceEndpoints(update, retryInterval) + go kd.watchNodes(update, done, retryInterval) + go kd.watchServices(update, done, retryInterval) + go kd.watchServiceEndpoints(update, done, retryInterval) + var tg *config.TargetGroup for { select { - case <-kd.runDone: + case <-done: return case event := <-update: switch obj := event.(type) { case *nodeEvent: kd.updateNode(obj.Node, obj.EventType) - kd.updateNodesTargetGroup(ch) + tg = kd.updateNodesTargetGroup() case *serviceEvent: - kd.updateService(obj.Service, obj.EventType, ch) + tg = kd.updateService(obj.Service, obj.EventType) case *endpointsEvent: - kd.updateServiceEndpoints(obj.Endpoints, obj.EventType, ch) + tg = kd.updateServiceEndpoints(obj.Endpoints, obj.EventType) } } + + select { + case ch <- tg: + case <-done: + return + } } } -// Stop implements the TargetProvider interface. -func (kd *KubernetesDiscovery) Stop() { - log.Debugf("Stopping Kubernetes discovery for %s", kd.Conf.Server) - - // The lock prevents Run from terminating while the watchers attempt - // to send on their channels. +func (kd *KubernetesDiscovery) updateNodesTargetGroup() *config.TargetGroup { kd.nodesMu.Lock() defer kd.nodesMu.Unlock() - kd.servicesMu.Lock() - defer kd.servicesMu.Unlock() - // Terminate Run. - kd.runDone <- struct{}{} - - log.Debugf("Kubernetes discovery for %s stopped.", kd.Conf.Server) -} - -func (kd *KubernetesDiscovery) updateNodesTargetGroup(ch chan<- *config.TargetGroup) { - kd.nodesMu.Lock() - defer kd.nodesMu.Unlock() tg := &config.TargetGroup{Source: nodesTargetGroupName} // Now let's loop through the nodes & add them to the target group with appropriate labels. @@ -235,7 +233,7 @@ func (kd *KubernetesDiscovery) updateNodesTargetGroup(ch chan<- *config.TargetGr tg.Targets = append(tg.Targets, t) } - ch <- tg + return tg } func (kd *KubernetesDiscovery) updateNode(node *Node, eventType EventType) { @@ -253,7 +251,7 @@ func (kd *KubernetesDiscovery) updateNode(node *Node, eventType EventType) { } // watchNodes watches nodes as they come & go. -func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, retryInterval time.Duration) { +func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { until(func() { req, err := http.NewRequest("GET", kd.Conf.Server+nodesURL, nil) if err != nil { @@ -283,13 +281,17 @@ func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, retryInterval return } kd.nodesResourceVersion = event.Node.ObjectMeta.ResourceVersion - events <- &event + + select { + case events <- &event: + case <-done: + } } - }, retryInterval, kd.runDone) + }, retryInterval, done) } // watchServices watches services as they come & go. -func (kd *KubernetesDiscovery) watchServices(events chan interface{}, retryInterval time.Duration) { +func (kd *KubernetesDiscovery) watchServices(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { until(func() { req, err := http.NewRequest("GET", kd.Conf.Server+servicesURL, nil) if err != nil { @@ -320,64 +322,77 @@ func (kd *KubernetesDiscovery) watchServices(events chan interface{}, retryInter return } kd.servicesResourceVersion = event.Service.ObjectMeta.ResourceVersion - events <- &event + + select { + case events <- &event: + case <-done: + } } - }, retryInterval, kd.runDone) + }, retryInterval, done) } -func (kd *KubernetesDiscovery) updateService(service *Service, eventType EventType, ch chan<- *config.TargetGroup) { +func (kd *KubernetesDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup { kd.servicesMu.Lock() defer kd.servicesMu.Unlock() - name := service.ObjectMeta.Name - namespace := service.ObjectMeta.Namespace - _, ok := kd.services[namespace][name] + + var ( + name = service.ObjectMeta.Name + namespace = service.ObjectMeta.Namespace + _, exists = kd.services[namespace][name] + ) + switch eventType { case deleted: - if ok { - kd.deleteService(service, ch) + if exists { + return kd.deleteService(service) } case added, modified: - kd.addService(service, ch) + return kd.addService(service) } + return nil } -func (kd *KubernetesDiscovery) deleteService(service *Service, ch chan<- *config.TargetGroup) { +func (kd *KubernetesDiscovery) deleteService(service *Service) *config.TargetGroup { tg := &config.TargetGroup{Source: serviceSource(service)} - ch <- tg + delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name) if len(kd.services[service.ObjectMeta.Namespace]) == 0 { delete(kd.services, service.ObjectMeta.Namespace) } + + return tg } -func (kd *KubernetesDiscovery) addService(service *Service, ch chan<- *config.TargetGroup) { +func (kd *KubernetesDiscovery) addService(service *Service) *config.TargetGroup { namespace, ok := kd.services[service.ObjectMeta.Namespace] if !ok { namespace = map[string]*Service{} kd.services[service.ObjectMeta.Namespace] = namespace } + namespace[service.ObjectMeta.Name] = service endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) + res, err := kd.client.Get(kd.Conf.Server + endpointURL) if err != nil { log.Errorf("Error getting service endpoints: %s", err) - return + return nil } if res.StatusCode != http.StatusOK { log.Errorf("Failed to get service endpoints: %s", res.StatusCode) - return + return nil } var endpoints Endpoints if err := json.NewDecoder(res.Body).Decode(&endpoints); err != nil { log.Errorf("Error getting service endpoints: %s", err) - return + return nil } - kd.updateServiceTargetGroup(service, &endpoints, ch) + return kd.updateServiceTargetGroup(service, &endpoints) } -func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoints *Endpoints, ch chan<- *config.TargetGroup) { +func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoints *Endpoints) *config.TargetGroup { tg := &config.TargetGroup{ Source: serviceSource(service), Labels: clientmodel.LabelSet{ @@ -413,11 +428,11 @@ func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoi } } - ch <- tg + return tg } // watchServiceEndpoints watches service endpoints as they come & go. -func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, retryInterval time.Duration) { +func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { until(func() { req, err := http.NewRequest("GET", kd.Conf.Server+endpointsURL, nil) if err != nil { @@ -448,19 +463,26 @@ func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, re return } kd.servicesResourceVersion = event.Endpoints.ObjectMeta.ResourceVersion - events <- &event + + select { + case events <- &event: + case <-done: + } } - }, retryInterval, kd.runDone) + }, retryInterval, done) } -func (kd *KubernetesDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType, ch chan<- *config.TargetGroup) { +func (kd *KubernetesDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup { kd.servicesMu.Lock() defer kd.servicesMu.Unlock() + serviceNamespace := endpoints.ObjectMeta.Namespace serviceName := endpoints.ObjectMeta.Name + if service, ok := kd.services[serviceNamespace][serviceName]; ok { - kd.updateServiceTargetGroup(service, endpoints, ch) + return kd.updateServiceTargetGroup(service, endpoints) } + return nil } func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) {