diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index eff401569a..1e2a6698b3 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -14,7 +14,6 @@ package kubernetes import ( - "encoding/json" "fmt" "io/ioutil" "net" @@ -29,7 +28,6 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/httputil" - "github.com/prometheus/prometheus/util/strutil" ) const ( @@ -106,10 +104,7 @@ type Discovery struct { apiServers []config.URL apiServersMu sync.RWMutex - services map[string]map[string]*Service - // map of namespace to (map of pod name to pod) - servicesMu sync.RWMutex - runDone chan struct{} + runDone chan struct{} } // Initialize sets up the discovery for usage. @@ -154,6 +149,16 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { wg.Done() }() + sd := &serviceDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + kd: kd, + } + wg.Add(1) + go func() { + sd.run(ctx, ch) + wg.Done() + }() + // Send an initial full view. // TODO(fabxc): this does not include all available services and service // endpoints yet. Service endpoints were also missing in the previous Sources() method. @@ -167,41 +172,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { return } - retryInterval := time.Duration(kd.Conf.RetryInterval) - - update := make(chan interface{}, 10) - - go kd.startServiceWatch(update, ctx.Done(), retryInterval) - - for { - tg := []*config.TargetGroup{} - select { - case <-ctx.Done(): - return - case event := <-update: - switch obj := event.(type) { - case *serviceEvent: - log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name) - tg = append(tg, kd.updateService(obj.Service, obj.EventType)) - case *endpointsEvent: - log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name) - tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)) - } - } - - if tg == nil { - continue - } - - for _, t := range tg { - select { - case ch <- []*config.TargetGroup{t}: - case <-ctx.Done(): - return - } - } - } - wg.Wait() } @@ -213,9 +183,6 @@ func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) { return kd.queryAPIServerReq(req) } -type client struct { -} - func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error) { // Lock in case we need to rotate API servers to request. kd.apiServersMu.Lock() @@ -272,297 +239,6 @@ func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup { return tg } -func (kd *Discovery) getServices() (map[string]map[string]*Service, string, error) { - res, err := kd.queryAPIServerPath(servicesURL) - if err != nil { - // If we can't list services then we can't watch them. Assume this is a misconfiguration - // & return error. - return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status) - } - var services ServiceList - if err := json.NewDecoder(res.Body).Decode(&services); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body)) - } - - serviceMap := map[string]map[string]*Service{} - for idx, service := range services.Items { - namespace, ok := serviceMap[service.ObjectMeta.Namespace] - if !ok { - namespace = map[string]*Service{} - serviceMap[service.ObjectMeta.Namespace] = namespace - } - namespace[service.ObjectMeta.Name] = &services.Items[idx] - } - - return serviceMap, services.ResourceVersion, nil -} - -// watchServices watches services as they come & go. -func (kd *Discovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - // We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted - // in Kubernetes while we couldn't connect - small chance of this, but worth dealing with. - existingServices := kd.services - - // Reset the known services. - kd.servicesMu.Lock() - kd.services = map[string]map[string]*Service{} - kd.servicesMu.Unlock() - - services, resourceVersion, err := kd.getServices() - if err != nil { - log.Errorf("Cannot initialize services collection: %s", err) - return - } - - // Now let's loop through the old services & see if they still exist in here - for oldNSName, oldNS := range existingServices { - if ns, ok := services[oldNSName]; !ok { - for _, service := range existingServices[oldNSName] { - events <- &serviceEvent{Deleted, service} - } - } else { - for oldServiceName, oldService := range oldNS { - if _, ok := ns[oldServiceName]; !ok { - events <- &serviceEvent{Deleted, oldService} - } - } - } - } - - // Discard the existing services map for GC. - existingServices = nil - - for _, ns := range services { - for _, service := range ns { - events <- &serviceEvent{Added, service} - } - } - - var wg sync.WaitGroup - wg.Add(2) - - go func() { - kd.watchServices(resourceVersion, events, done) - wg.Done() - }() - go func() { - kd.watchServiceEndpoints(resourceVersion, events, done) - wg.Done() - }() - - wg.Wait() - }, retryInterval, done) -} - -func (kd *Discovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { - req, err := http.NewRequest("GET", servicesURL, nil) - if err != nil { - log.Errorf("Failed to create services request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch services: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch services: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event serviceEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch services unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - return - } - } -} - -// watchServiceEndpoints watches service endpoints as they come & go. -func (kd *Discovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { - req, err := http.NewRequest("GET", endpointsURL, nil) - if err != nil { - log.Errorf("Failed to create service endpoints request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch service endpoints: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event endpointsEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch service endpoints unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } -} - -func (kd *Discovery) updateService(service *Service, eventType EventType) *config.TargetGroup { - kd.servicesMu.Lock() - defer kd.servicesMu.Unlock() - - switch eventType { - case Deleted: - return kd.deleteService(service) - case Added, Modified: - return kd.addService(service) - } - return nil -} - -func (kd *Discovery) deleteService(service *Service) *config.TargetGroup { - tg := &config.TargetGroup{Source: serviceSource(service)} - - delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name) - if len(kd.services[service.ObjectMeta.Namespace]) == 0 { - delete(kd.services, service.ObjectMeta.Namespace) - } - - return tg -} - -func (kd *Discovery) addService(service *Service) *config.TargetGroup { - namespace, ok := kd.services[service.ObjectMeta.Namespace] - if !ok { - namespace = map[string]*Service{} - kd.services[service.ObjectMeta.Namespace] = namespace - } - - namespace[service.ObjectMeta.Name] = service - endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) - - res, err := kd.queryAPIServerPath(endpointURL) - if err != nil { - log.Errorf("Error getting service endpoints: %s", err) - return nil - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to get service endpoints: %d", res.StatusCode) - return nil - } - - var eps Endpoints - if err := json.NewDecoder(res.Body).Decode(&eps); err != nil { - log.Errorf("Error getting service endpoints: %s", err) - return nil - } - - return kd.updateServiceTargetGroup(service, &eps) -} - -func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup { - tg := &config.TargetGroup{ - Source: serviceSource(service), - Labels: model.LabelSet{ - serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace), - serviceNameLabel: model.LabelValue(service.ObjectMeta.Name), - }, - } - - for k, v := range service.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k) - tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) - } - - for k, v := range service.ObjectMeta.Annotations { - labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) - tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) - } - - serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc" - - // Append the first TCP service port if one exists. - for _, port := range service.Spec.Ports { - if port.Protocol == ProtocolTCP { - serviceAddress += fmt.Sprintf(":%d", port.Port) - break - } - } - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(serviceAddress), - roleLabel: model.LabelValue("service"), - } - tg.Targets = append(tg.Targets, t) - - // Now let's loop through the endpoints & add them to the target group with appropriate labels. - for _, ss := range eps.Subsets { - epPort := ss.Ports[0].Port - - for _, addr := range ss.Addresses { - ipAddr := addr.IP - if len(ipAddr) == net.IPv6len { - ipAddr = "[" + ipAddr + "]" - } - address := fmt.Sprintf("%s:%d", ipAddr, epPort) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(address), - roleLabel: model.LabelValue("endpoint"), - } - - tg.Targets = append(tg.Targets, t) - } - } - - return tg -} - -func (kd *Discovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup { - kd.servicesMu.Lock() - defer kd.servicesMu.Unlock() - - serviceNamespace := endpoints.ObjectMeta.Namespace - serviceName := endpoints.ObjectMeta.Name - - if service, ok := kd.services[serviceNamespace][serviceName]; ok { - return kd.updateServiceTargetGroup(service, endpoints) - } - return nil -} - func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) { bearerTokenFile := conf.BearerTokenFile caFile := conf.TLSConfig.CAFile @@ -622,10 +298,6 @@ func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, err }, nil } -func serviceSource(service *Service) string { - return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name -} - // Until loops until stop channel is closed, running f every period. // f may not be invoked if stop channel is already closed. func until(f func(), period time.Duration, stopCh <-chan struct{}) { diff --git a/retrieval/discovery/kubernetes/service.go b/retrieval/discovery/kubernetes/service.go new file mode 100644 index 0000000000..8bd7df8a8d --- /dev/null +++ b/retrieval/discovery/kubernetes/service.go @@ -0,0 +1,366 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "sync" + "time" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/net/context" +) + +type serviceDiscovery struct { + mtx sync.RWMutex + services map[string]map[string]*Service + retryInterval time.Duration + kd *Discovery +} + +func (d *serviceDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { + update := make(chan interface{}, 10) + go d.startServiceWatch(update, ctx.Done(), d.retryInterval) + + for { + tgs := []*config.TargetGroup{} + select { + case <-ctx.Done(): + return + case event := <-update: + switch e := event.(type) { + case *endpointsEvent: + log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", e.EventType, e.Endpoints.ObjectMeta.Name) + tgs = append(tgs, d.updateServiceEndpoints(e.Endpoints, e.EventType)) + case *serviceEvent: + log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", e.EventType, e.Service.ObjectMeta.Name) + tgs = append(tgs, d.updateService(e.Service, e.EventType)) + } + } + if tgs == nil { + continue + } + + for _, tg := range tgs { + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } + } + } + +} + +func (d *serviceDiscovery) getServices() (map[string]map[string]*Service, string, error) { + res, err := d.kd.queryAPIServerPath(servicesURL) + if err != nil { + // If we can't list services then we can't watch them. Assume this is a misconfiguration + // & return error. + return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status) + } + var services ServiceList + if err := json.NewDecoder(res.Body).Decode(&services); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body)) + } + + serviceMap := map[string]map[string]*Service{} + for idx, service := range services.Items { + namespace, ok := serviceMap[service.ObjectMeta.Namespace] + if !ok { + namespace = map[string]*Service{} + serviceMap[service.ObjectMeta.Namespace] = namespace + } + namespace[service.ObjectMeta.Name] = &services.Items[idx] + } + + return serviceMap, services.ResourceVersion, nil +} + +// watchServices watches services as they come & go. +func (d *serviceDiscovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) { + until(func() { + // We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted + // in Kubernetes while we couldn't connect - small chance of this, but worth dealing with. + d.mtx.Lock() + existingServices := d.services + + // Reset the known services. + d.services = map[string]map[string]*Service{} + d.mtx.Unlock() + + services, resourceVersion, err := d.getServices() + if err != nil { + log.Errorf("Cannot initialize services collection: %s", err) + return + } + + // Now let's loop through the old services & see if they still exist in here + for oldNSName, oldNS := range existingServices { + if ns, ok := services[oldNSName]; !ok { + for _, service := range existingServices[oldNSName] { + events <- &serviceEvent{Deleted, service} + } + } else { + for oldServiceName, oldService := range oldNS { + if _, ok := ns[oldServiceName]; !ok { + events <- &serviceEvent{Deleted, oldService} + } + } + } + } + + // Discard the existing services map for GC. + existingServices = nil + + for _, ns := range services { + for _, service := range ns { + events <- &serviceEvent{Added, service} + } + } + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + d.watchServices(resourceVersion, events, done) + wg.Done() + }() + go func() { + d.watchServiceEndpoints(resourceVersion, events, done) + wg.Done() + }() + + wg.Wait() + }, retryInterval, done) +} + +func (d *serviceDiscovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { + req, err := http.NewRequest("GET", servicesURL, nil) + if err != nil { + log.Errorf("Failed to create services request: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch services: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch services: %d", res.StatusCode) + return + } + + dec := json.NewDecoder(res.Body) + + for { + var event serviceEvent + if err := dec.Decode(&event); err != nil { + log.Errorf("Watch services unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + return + } + } +} + +// watchServiceEndpoints watches service endpoints as they come & go. +func (d *serviceDiscovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { + req, err := http.NewRequest("GET", endpointsURL, nil) + if err != nil { + log.Errorf("Failed to create service endpoints request: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch service endpoints: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) + return + } + + dec := json.NewDecoder(res.Body) + + for { + var event endpointsEvent + if err := dec.Decode(&event); err != nil { + log.Errorf("Watch service endpoints unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } +} + +func (d *serviceDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup { + d.mtx.Lock() + defer d.mtx.Unlock() + + switch eventType { + case Deleted: + return d.deleteService(service) + case Added, Modified: + return d.addService(service) + } + return nil +} + +func (d *serviceDiscovery) deleteService(service *Service) *config.TargetGroup { + tg := &config.TargetGroup{Source: serviceSource(service)} + + delete(d.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name) + if len(d.services[service.ObjectMeta.Namespace]) == 0 { + delete(d.services, service.ObjectMeta.Namespace) + } + + return tg +} + +func (d *serviceDiscovery) addService(service *Service) *config.TargetGroup { + namespace, ok := d.services[service.ObjectMeta.Namespace] + if !ok { + namespace = map[string]*Service{} + d.services[service.ObjectMeta.Namespace] = namespace + } + + namespace[service.ObjectMeta.Name] = service + endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) + + res, err := d.kd.queryAPIServerPath(endpointURL) + if err != nil { + log.Errorf("Error getting service endpoints: %s", err) + return nil + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to get service endpoints: %d", res.StatusCode) + return nil + } + + var eps Endpoints + if err := json.NewDecoder(res.Body).Decode(&eps); err != nil { + log.Errorf("Error getting service endpoints: %s", err) + return nil + } + + return d.updateServiceTargetGroup(service, &eps) +} + +func (d *serviceDiscovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup { + tg := &config.TargetGroup{ + Source: serviceSource(service), + Labels: model.LabelSet{ + serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace), + serviceNameLabel: model.LabelValue(service.ObjectMeta.Name), + }, + } + + for k, v := range service.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k) + tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) + } + + for k, v := range service.ObjectMeta.Annotations { + labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) + tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) + } + + serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc" + + // Append the first TCP service port if one exists. + for _, port := range service.Spec.Ports { + if port.Protocol == ProtocolTCP { + serviceAddress += fmt.Sprintf(":%d", port.Port) + break + } + } + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(serviceAddress), + roleLabel: model.LabelValue("service"), + } + tg.Targets = append(tg.Targets, t) + + // Now let's loop through the endpoints & add them to the target group with appropriate labels. + for _, ss := range eps.Subsets { + epPort := ss.Ports[0].Port + + for _, addr := range ss.Addresses { + ipAddr := addr.IP + if len(ipAddr) == net.IPv6len { + ipAddr = "[" + ipAddr + "]" + } + address := fmt.Sprintf("%s:%d", ipAddr, epPort) + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(address), + roleLabel: model.LabelValue("endpoint"), + } + + tg.Targets = append(tg.Targets, t) + } + } + + return tg +} + +func (d *serviceDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup { + d.mtx.Lock() + defer d.mtx.Unlock() + + serviceNamespace := endpoints.ObjectMeta.Namespace + serviceName := endpoints.ObjectMeta.Name + + if service, ok := d.services[serviceNamespace][serviceName]; ok { + return d.updateServiceTargetGroup(service, endpoints) + } + return nil +} + +func serviceSource(service *Service) string { + return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name +}