diff --git a/config/config.go b/config/config.go index 36847d045..079d546a6 100644 --- a/config/config.go +++ b/config/config.go @@ -792,6 +792,7 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro // KubernetesSDConfig is the configuration for Kubernetes service discovery. type KubernetesSDConfig struct { APIServers []URL `yaml:"api_servers"` + Role string `yaml:"role"` InCluster bool `yaml:"in_cluster,omitempty"` BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"` @@ -804,6 +805,29 @@ type KubernetesSDConfig struct { XXX map[string]interface{} `yaml:",inline"` } +type KubernetesRole string + +const ( + KubernetesRoleNode = "node" + KubernetesRolePod = "pod" + KubernetesRoleContainer = "container" + KubernetesRoleService = "service" + KubernetesRoleEndpoint = "endpoint" + KubernetesRoleAPIServer = "apiserver" +) + +func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := unmarshal((*string)(c)); err != nil { + return err + } + switch *c { + case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleContainer, KubernetesRoleService, KubernetesRoleEndpoint, KubernetesRoleAPIServer: + return nil + default: + return fmt.Errorf("Unknown Kubernetes SD role %q", c) + } +} + // UnmarshalYAML implements the yaml.Unmarshaler interface. func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = DefaultKubernetesSDConfig @@ -815,6 +839,9 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er if err := checkOverflow(c.XXX, "kubernetes_sd_config"); err != nil { return err } + if c.Role == "" { + return fmt.Errorf("role missing (one of: container, pod, service, endpoint, node, apiserver)") + } if len(c.APIServers) == 0 { return fmt.Errorf("Kubernetes SD configuration requires at least one Kubernetes API server") } @@ -824,7 +851,6 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er if c.BasicAuth != nil && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) { return fmt.Errorf("at most one of basic_auth, bearer_token & bearer_token_file must be configured") } - return nil } diff --git a/config/config_test.go b/config/config_test.go index a7555d878..ca9ab86a7 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -223,6 +223,7 @@ var expectedConf = &Config{ KubernetesSDConfigs: []*KubernetesSDConfig{ { APIServers: []URL{kubernetesSDHostURL()}, + Role: KubernetesRoleEndpoint, BasicAuth: &BasicAuth{ Username: "myusername", Password: "mypassword", diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index b911db767..ea9acfbf1 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -109,7 +109,8 @@ scrape_configs: - job_name: service-kubernetes kubernetes_sd_configs: - - api_servers: + - role: endpoint + api_servers: - 'https://localhost:1234' basic_auth: diff --git a/config/testdata/kubernetes_bearertoken.bad.yml b/config/testdata/kubernetes_bearertoken.bad.yml index 756644be6..533742fd3 100644 --- a/config/testdata/kubernetes_bearertoken.bad.yml +++ b/config/testdata/kubernetes_bearertoken.bad.yml @@ -2,7 +2,8 @@ scrape_configs: - job_name: prometheus kubernetes_sd_configs: - - api_servers: + - role: node + api_servers: - 'https://localhost:1234' bearer_token: 1234 diff --git a/config/testdata/kubernetes_bearertoken_basicauth.bad.yml b/config/testdata/kubernetes_bearertoken_basicauth.bad.yml index 9ef16ae4b..e7c1633d5 100644 --- a/config/testdata/kubernetes_bearertoken_basicauth.bad.yml +++ b/config/testdata/kubernetes_bearertoken_basicauth.bad.yml @@ -2,7 +2,8 @@ scrape_configs: - job_name: prometheus kubernetes_sd_configs: - - api_servers: + - role: pod + api_servers: - 'https://localhost:1234' bearer_token: 1234 diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go deleted file mode 100644 index 00f388d65..000000000 --- a/retrieval/discovery/consul.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/retrieval/discovery/consul" -) - -// NewConsul creates a new Consul based Discovery. -func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) { - return consul.NewDiscovery(cfg) -} diff --git a/retrieval/discovery/kubernetes.go b/retrieval/discovery/discovery.go similarity index 55% rename from retrieval/discovery/kubernetes.go rename to retrieval/discovery/discovery.go index bc1f73cc8..795de8158 100644 --- a/retrieval/discovery/kubernetes.go +++ b/retrieval/discovery/discovery.go @@ -1,4 +1,4 @@ -// Copyright 2015 The Prometheus Authors +// Copyright 2016 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,10 +14,20 @@ package discovery import ( + "time" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/retrieval/discovery/consul" + "github.com/prometheus/prometheus/retrieval/discovery/dns" "github.com/prometheus/prometheus/retrieval/discovery/kubernetes" + "github.com/prometheus/prometheus/retrieval/discovery/marathon" ) +// NewConsul creates a new Consul based Discovery. +func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) { + return consul.NewDiscovery(cfg) +} + // NewKubernetesDiscovery creates a Kubernetes service discovery based on the passed-in configuration. func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discovery, error) { kd := &kubernetes.Discovery{ @@ -29,3 +39,17 @@ func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discov } return kd, nil } + +// NewMarathon creates a new Marathon based discovery. +func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery { + return &marathon.Discovery{ + Servers: conf.Servers, + RefreshInterval: time.Duration(conf.RefreshInterval), + Client: marathon.FetchApps, + } +} + +// NewDNS creates a new DNS based discovery. +func NewDNS(conf *config.DNSSDConfig) *dns.Discovery { + return dns.NewDiscovery(conf) +} diff --git a/retrieval/discovery/dns.go b/retrieval/discovery/dns.go deleted file mode 100644 index c413efdfc..000000000 --- a/retrieval/discovery/dns.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "github.com/prometheus/prometheus/retrieval/discovery/dns" - - "github.com/prometheus/prometheus/config" -) - -// NewDNS creates a new DNS based discovery. -func NewDNS(conf *config.DNSSDConfig) *dns.Discovery { - return dns.NewDiscovery(conf) -} diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index c2db40cb5..f05e2aa5e 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -14,16 +14,11 @@ package kubernetes import ( - "bytes" - "encoding/json" "fmt" "io/ioutil" "net" "net/http" "os" - "sort" - "strconv" - "strings" "sync" "time" @@ -33,7 +28,6 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/httputil" - "github.com/prometheus/prometheus/util/strutil" ) const ( @@ -110,14 +104,6 @@ type Discovery struct { apiServers []config.URL apiServersMu sync.RWMutex - nodes map[string]*Node - services map[string]map[string]*Service - // map of namespace to (map of pod name to pod) - pods map[string]map[string]*Pod - nodesMu sync.RWMutex - servicesMu sync.RWMutex - podsMu sync.RWMutex - runDone chan struct{} } // Initialize sets up the discovery for usage. @@ -130,92 +116,43 @@ func (kd *Discovery) Initialize() error { kd.apiServers = kd.Conf.APIServers kd.client = client - kd.runDone = make(chan struct{}) return nil } // Run implements the TargetProvider interface. func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - log.Debugf("Kubernetes Discovery.Run beginning") + log.Debugf("Start Kubernetes service discovery") defer close(ch) - // Send an initial full view. - // TODO(fabxc): this does not include all available services and service - // endpoints yet. Service endpoints were also missing in the previous Sources() method. - var all []*config.TargetGroup - - all = append(all, kd.updateAPIServersTargetGroup()) - all = append(all, kd.updateNodesTargetGroup()) - - pods, _, err := kd.getPods() - if err != nil { - log.Errorf("Cannot initialize pods collection: %s", err) - return - } - kd.podsMu.Lock() - kd.pods = pods - kd.podsMu.Unlock() - - all = append(all, kd.updatePodsTargetGroup()) - for _, ns := range kd.pods { - for _, pod := range ns { - all = append(all, kd.updatePodTargetGroup(pod)) + switch kd.Conf.Role { + case config.KubernetesRolePod, config.KubernetesRoleContainer: + pd := &podDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + kd: kd, } - } - - select { - case ch <- all: - case <-ctx.Done(): - return - } - - retryInterval := time.Duration(kd.Conf.RetryInterval) - - update := make(chan interface{}, 10) - - go kd.watchNodes(update, ctx.Done(), retryInterval) - go kd.startServiceWatch(update, ctx.Done(), retryInterval) - go kd.watchPods(update, ctx.Done(), retryInterval) - - for { - tg := []*config.TargetGroup{} + pd.run(ctx, ch) + case config.KubernetesRoleNode: + nd := &nodeDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + kd: kd, + } + nd.run(ctx, ch) + case config.KubernetesRoleService, config.KubernetesRoleEndpoint: + sd := &serviceDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + kd: kd, + } + sd.run(ctx, ch) + case config.KubernetesRoleAPIServer: select { + case ch <- []*config.TargetGroup{kd.updateAPIServersTargetGroup()}: case <-ctx.Done(): return - case event := <-update: - switch obj := event.(type) { - case *nodeEvent: - log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", obj.EventType, obj.Node.ObjectMeta.Name) - kd.updateNode(obj.Node, obj.EventType) - tg = append(tg, kd.updateNodesTargetGroup()) - case *serviceEvent: - log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name) - tg = append(tg, kd.updateService(obj.Service, obj.EventType)) - case *endpointsEvent: - log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name) - tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)) - case *podEvent: - log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", obj.EventType, obj.Pod.ObjectMeta.Name) - // Update the per-pod target group - kd.updatePod(obj.Pod, obj.EventType) - tg = append(tg, kd.updatePodTargetGroup(obj.Pod)) - // ...and update the all pods target group - tg = append(tg, kd.updatePodsTargetGroup()) - } - } - - if tg == nil { - continue - } - - for _, t := range tg { - select { - case ch <- []*config.TargetGroup{t}: - case <-ctx.Done(): - return - } } + default: + log.Errorf("unknown Kubernetes discovery kind %q", kd.Conf.Role) + return } } @@ -283,437 +220,6 @@ func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup { return tg } -func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup { - kd.nodesMu.RLock() - defer kd.nodesMu.RUnlock() - - tg := &config.TargetGroup{ - Source: nodesTargetGroupName, - Labels: model.LabelSet{ - roleLabel: model.LabelValue("node"), - }, - } - - // Now let's loop through the nodes & add them to the target group with appropriate labels. - for nodeName, node := range kd.nodes { - defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node) - if err != nil { - log.Debugf("Skipping node %s: %s", node.Name, err) - continue - } - - kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) - - address := fmt.Sprintf("%s:%d", defaultNodeAddress.String(), kubeletPort) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(address), - model.InstanceLabel: model.LabelValue(nodeName), - } - - for addrType, ip := range nodeAddressMap { - labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType)) - t[model.LabelName(labelName)] = model.LabelValue(ip[0].String()) - } - - t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort)) - - for k, v := range node.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - tg.Targets = append(tg.Targets, t) - } - - return tg -} - -func (kd *Discovery) updateNode(node *Node, eventType EventType) { - kd.nodesMu.Lock() - defer kd.nodesMu.Unlock() - updatedNodeName := node.ObjectMeta.Name - switch eventType { - case Deleted: - // Deleted - remove from nodes map. - delete(kd.nodes, updatedNodeName) - case Added, Modified: - // Added/Modified - update the node in the nodes map. - kd.nodes[updatedNodeName] = node - } -} - -func (kd *Discovery) getNodes() (map[string]*Node, string, error) { - res, err := kd.queryAPIServerPath(nodesURL) - if err != nil { - // If we can't list nodes then we can't watch them. Assume this is a misconfiguration - // & return error. - return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status) - } - - var nodes NodeList - if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body)) - } - - nodeMap := map[string]*Node{} - for idx, node := range nodes.Items { - nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx] - } - - return nodeMap, nodes.ResourceVersion, nil -} - -func (kd *Discovery) getServices() (map[string]map[string]*Service, string, error) { - res, err := kd.queryAPIServerPath(servicesURL) - if err != nil { - // If we can't list services then we can't watch them. Assume this is a misconfiguration - // & return error. - return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status) - } - var services ServiceList - if err := json.NewDecoder(res.Body).Decode(&services); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body)) - } - - serviceMap := map[string]map[string]*Service{} - for idx, service := range services.Items { - namespace, ok := serviceMap[service.ObjectMeta.Namespace] - if !ok { - namespace = map[string]*Service{} - serviceMap[service.ObjectMeta.Namespace] = namespace - } - namespace[service.ObjectMeta.Name] = &services.Items[idx] - } - - return serviceMap, services.ResourceVersion, nil -} - -// watchNodes watches nodes as they come & go. -func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - nodes, resourceVersion, err := kd.getNodes() - if err != nil { - log.Errorf("Cannot initialize nodes collection: %s", err) - return - } - - // Reset the known nodes. - kd.nodesMu.Lock() - kd.nodes = map[string]*Node{} - kd.nodesMu.Unlock() - - for _, node := range nodes { - events <- &nodeEvent{Added, node} - } - - req, err := http.NewRequest("GET", nodesURL, nil) - if err != nil { - log.Errorf("Cannot create nodes request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch nodes: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch nodes: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event nodeEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch nodes unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } - }, retryInterval, done) -} - -// watchServices watches services as they come & go. -func (kd *Discovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - // We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted - // in Kubernetes while we couldn't connect - small chance of this, but worth dealing with. - existingServices := kd.services - - // Reset the known services. - kd.servicesMu.Lock() - kd.services = map[string]map[string]*Service{} - kd.servicesMu.Unlock() - - services, resourceVersion, err := kd.getServices() - if err != nil { - log.Errorf("Cannot initialize services collection: %s", err) - return - } - - // Now let's loop through the old services & see if they still exist in here - for oldNSName, oldNS := range existingServices { - if ns, ok := services[oldNSName]; !ok { - for _, service := range existingServices[oldNSName] { - events <- &serviceEvent{Deleted, service} - } - } else { - for oldServiceName, oldService := range oldNS { - if _, ok := ns[oldServiceName]; !ok { - events <- &serviceEvent{Deleted, oldService} - } - } - } - } - - // Discard the existing services map for GC. - existingServices = nil - - for _, ns := range services { - for _, service := range ns { - events <- &serviceEvent{Added, service} - } - } - - var wg sync.WaitGroup - wg.Add(2) - - go func() { - kd.watchServices(resourceVersion, events, done) - wg.Done() - }() - go func() { - kd.watchServiceEndpoints(resourceVersion, events, done) - wg.Done() - }() - - wg.Wait() - }, retryInterval, done) -} - -func (kd *Discovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { - req, err := http.NewRequest("GET", servicesURL, nil) - if err != nil { - log.Errorf("Failed to create services request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch services: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch services: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event serviceEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch services unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - return - } - } -} - -// watchServiceEndpoints watches service endpoints as they come & go. -func (kd *Discovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { - req, err := http.NewRequest("GET", endpointsURL, nil) - if err != nil { - log.Errorf("Failed to create service endpoints request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch service endpoints: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event endpointsEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch service endpoints unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } -} - -func (kd *Discovery) updateService(service *Service, eventType EventType) *config.TargetGroup { - kd.servicesMu.Lock() - defer kd.servicesMu.Unlock() - - switch eventType { - case Deleted: - return kd.deleteService(service) - case Added, Modified: - return kd.addService(service) - } - return nil -} - -func (kd *Discovery) deleteService(service *Service) *config.TargetGroup { - tg := &config.TargetGroup{Source: serviceSource(service)} - - delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name) - if len(kd.services[service.ObjectMeta.Namespace]) == 0 { - delete(kd.services, service.ObjectMeta.Namespace) - } - - return tg -} - -func (kd *Discovery) addService(service *Service) *config.TargetGroup { - namespace, ok := kd.services[service.ObjectMeta.Namespace] - if !ok { - namespace = map[string]*Service{} - kd.services[service.ObjectMeta.Namespace] = namespace - } - - namespace[service.ObjectMeta.Name] = service - endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) - - res, err := kd.queryAPIServerPath(endpointURL) - if err != nil { - log.Errorf("Error getting service endpoints: %s", err) - return nil - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to get service endpoints: %d", res.StatusCode) - return nil - } - - var eps Endpoints - if err := json.NewDecoder(res.Body).Decode(&eps); err != nil { - log.Errorf("Error getting service endpoints: %s", err) - return nil - } - - return kd.updateServiceTargetGroup(service, &eps) -} - -func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup { - tg := &config.TargetGroup{ - Source: serviceSource(service), - Labels: model.LabelSet{ - serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace), - serviceNameLabel: model.LabelValue(service.ObjectMeta.Name), - }, - } - - for k, v := range service.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k) - tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) - } - - for k, v := range service.ObjectMeta.Annotations { - labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) - tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) - } - - serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc" - - // Append the first TCP service port if one exists. - for _, port := range service.Spec.Ports { - if port.Protocol == ProtocolTCP { - serviceAddress += fmt.Sprintf(":%d", port.Port) - break - } - } - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(serviceAddress), - roleLabel: model.LabelValue("service"), - } - tg.Targets = append(tg.Targets, t) - - // Now let's loop through the endpoints & add them to the target group with appropriate labels. - for _, ss := range eps.Subsets { - epPort := ss.Ports[0].Port - - for _, addr := range ss.Addresses { - ipAddr := addr.IP - if len(ipAddr) == net.IPv6len { - ipAddr = "[" + ipAddr + "]" - } - address := fmt.Sprintf("%s:%d", ipAddr, epPort) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(address), - roleLabel: model.LabelValue("endpoint"), - } - - tg.Targets = append(tg.Targets, t) - } - } - - return tg -} - -func (kd *Discovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup { - kd.servicesMu.Lock() - defer kd.servicesMu.Unlock() - - serviceNamespace := endpoints.ObjectMeta.Namespace - serviceName := endpoints.ObjectMeta.Name - - if service, ok := kd.services[serviceNamespace][serviceName]; ok { - return kd.updateServiceTargetGroup(service, endpoints) - } - return nil -} - func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) { bearerTokenFile := conf.BearerTokenFile caFile := conf.TLSConfig.CAFile @@ -773,10 +279,6 @@ func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, err }, nil } -func serviceSource(service *Service) string { - return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name -} - // Until loops until stop channel is closed, running f every period. // f may not be invoked if stop channel is already closed. func until(f func(), period time.Duration, stopCh <-chan struct{}) { @@ -795,272 +297,3 @@ func until(f func(), period time.Duration, stopCh <-chan struct{}) { } } } - -// nodeAddresses returns the provided node's address, based on the priority: -// 1. NodeInternalIP -// 2. NodeExternalIP -// 3. NodeLegacyHostIP -// -// Copied from k8s.io/kubernetes/pkg/util/node/node.go -func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) { - addresses := node.Status.Addresses - addressMap := map[NodeAddressType][]net.IP{} - for _, addr := range addresses { - ip := net.ParseIP(addr.Address) - // All addresses should be valid IPs. - if ip == nil { - continue - } - addressMap[addr.Type] = append(addressMap[addr.Type], ip) - } - if addresses, ok := addressMap[NodeInternalIP]; ok { - return addresses[0], addressMap, nil - } - if addresses, ok := addressMap[NodeExternalIP]; ok { - return addresses[0], addressMap, nil - } - if addresses, ok := addressMap[NodeLegacyHostIP]; ok { - return addresses[0], addressMap, nil - } - return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) -} - -func (kd *Discovery) updatePod(pod *Pod, eventType EventType) { - kd.podsMu.Lock() - defer kd.podsMu.Unlock() - - switch eventType { - case Deleted: - if _, ok := kd.pods[pod.ObjectMeta.Namespace]; ok { - delete(kd.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name) - if len(kd.pods[pod.ObjectMeta.Namespace]) == 0 { - delete(kd.pods, pod.ObjectMeta.Namespace) - } - } - case Added, Modified: - if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok { - kd.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{} - } - kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod - } -} - -func (kd *Discovery) getPods() (map[string]map[string]*Pod, string, error) { - res, err := kd.queryAPIServerPath(podsURL) - if err != nil { - return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status) - } - - var pods PodList - if err := json.NewDecoder(res.Body).Decode(&pods); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body)) - } - - podMap := map[string]map[string]*Pod{} - for idx, pod := range pods.Items { - if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok { - podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{} - } - log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) - podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx] - } - - return podMap, pods.ResourceVersion, nil -} - -func (kd *Discovery) watchPods(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - pods, resourceVersion, err := kd.getPods() - if err != nil { - log.Errorf("Cannot initialize pods collection: %s", err) - return - } - kd.podsMu.Lock() - kd.pods = pods - kd.podsMu.Unlock() - - req, err := http.NewRequest("GET", podsURL, nil) - if err != nil { - log.Errorf("Cannot create pods request: %s", err) - return - } - - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch pods: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch pods: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event podEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch pods unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } - }, retryInterval, done) -} - -func podSource(pod *Pod) string { - return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name -} - -type ByContainerPort []ContainerPort - -func (a ByContainerPort) Len() int { return len(a) } -func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } -func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -type ByContainerName []Container - -func (a ByContainerName) Len() int { return len(a) } -func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name } -func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { - var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers)) - if pod.PodStatus.PodIP == "" { - log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name) - return targets - } - - if pod.PodStatus.Phase != "Running" { - log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name) - return targets - } - - ready := "unknown" - for _, cond := range pod.PodStatus.Conditions { - if strings.ToLower(cond.Type) == "ready" { - ready = strings.ToLower(cond.Status) - } - } - - sort.Sort(ByContainerName(pod.PodSpec.Containers)) - - for _, container := range pod.PodSpec.Containers { - // Collect a list of TCP ports - // Sort by port number, ascending - // Product a target pointed at the first port - // Include a label containing all ports (portName=port,PortName=port,...,) - var tcpPorts []ContainerPort - var portLabel *bytes.Buffer = bytes.NewBufferString(",") - - for _, port := range container.Ports { - if port.Protocol == "TCP" { - tcpPorts = append(tcpPorts, port) - } - } - - if len(tcpPorts) == 0 { - log.Debugf("skipping container %s with no TCP ports", container.Name) - continue - } - - sort.Sort(ByContainerPort(tcpPorts)) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), - podNameLabel: model.LabelValue(pod.ObjectMeta.Name), - podAddressLabel: model.LabelValue(pod.PodStatus.PodIP), - podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), - podContainerNameLabel: model.LabelValue(container.Name), - podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name), - podReadyLabel: model.LabelValue(ready), - } - - for _, port := range tcpPorts { - portLabel.WriteString(port.Name) - portLabel.WriteString("=") - portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) - portLabel.WriteString(",") - t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) - } - - t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String()) - - for k, v := range pod.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(podLabelPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - - for k, v := range pod.ObjectMeta.Annotations { - labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - - targets = append(targets, t) - - if !allContainers { - break - } - } - - if len(targets) == 0 { - log.Debugf("no targets for pod %s", pod.ObjectMeta.Name) - } - - return targets -} - -func (kd *Discovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup { - kd.podsMu.RLock() - defer kd.podsMu.RUnlock() - - tg := &config.TargetGroup{ - Source: podSource(pod), - } - - // If this pod doesn't exist, return an empty target group - if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok { - return tg - } - if _, ok := kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok { - return tg - } - - tg.Labels = model.LabelSet{ - roleLabel: model.LabelValue("container"), - } - tg.Targets = updatePodTargets(pod, true) - - return tg -} - -func (kd *Discovery) updatePodsTargetGroup() *config.TargetGroup { - tg := &config.TargetGroup{ - Source: podsTargetGroupName, - Labels: model.LabelSet{ - roleLabel: model.LabelValue("pod"), - }, - } - - for _, namespace := range kd.pods { - for _, pod := range namespace { - tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...) - } - } - - return tg -} diff --git a/retrieval/discovery/kubernetes/node.go b/retrieval/discovery/kubernetes/node.go new file mode 100644 index 000000000..e64bc2eb1 --- /dev/null +++ b/retrieval/discovery/kubernetes/node.go @@ -0,0 +1,242 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "strconv" + "sync" + "time" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/net/context" +) + +type nodeDiscovery struct { + mtx sync.RWMutex + nodes map[string]*Node + retryInterval time.Duration + kd *Discovery +} + +func (d *nodeDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { + select { + case ch <- []*config.TargetGroup{d.updateNodesTargetGroup()}: + case <-ctx.Done(): + return + } + + update := make(chan *nodeEvent, 10) + go d.watchNodes(update, ctx.Done(), d.retryInterval) + + for { + tgs := []*config.TargetGroup{} + select { + case <-ctx.Done(): + return + case e := <-update: + log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", e.EventType, e.Node.ObjectMeta.Name) + d.updateNode(e.Node, e.EventType) + tgs = append(tgs, d.updateNodesTargetGroup()) + } + if tgs == nil { + continue + } + + for _, tg := range tgs { + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } + } + } +} + +func (d *nodeDiscovery) updateNodesTargetGroup() *config.TargetGroup { + d.mtx.RLock() + defer d.mtx.RUnlock() + + tg := &config.TargetGroup{ + Source: nodesTargetGroupName, + Labels: model.LabelSet{ + roleLabel: model.LabelValue("node"), + }, + } + + // Now let's loop through the nodes & add them to the target group with appropriate labels. + for nodeName, node := range d.nodes { + defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node) + if err != nil { + log.Debugf("Skipping node %s: %s", node.Name, err) + continue + } + + kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) + + address := fmt.Sprintf("%s:%d", defaultNodeAddress.String(), kubeletPort) + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(address), + model.InstanceLabel: model.LabelValue(nodeName), + } + + for addrType, ip := range nodeAddressMap { + labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType)) + t[model.LabelName(labelName)] = model.LabelValue(ip[0].String()) + } + + t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort)) + + for k, v := range node.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + tg.Targets = append(tg.Targets, t) + } + + return tg +} + +// watchNodes watches nodes as they come & go. +func (d *nodeDiscovery) watchNodes(events chan *nodeEvent, done <-chan struct{}, retryInterval time.Duration) { + until(func() { + nodes, resourceVersion, err := d.getNodes() + if err != nil { + log.Errorf("Cannot initialize nodes collection: %s", err) + return + } + + // Reset the known nodes. + d.mtx.Lock() + d.nodes = map[string]*Node{} + d.mtx.Unlock() + + for _, node := range nodes { + events <- &nodeEvent{Added, node} + } + + req, err := http.NewRequest("GET", nodesURL, nil) + if err != nil { + log.Errorf("Cannot create nodes request: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch nodes: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch nodes: %d", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event nodeEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Watch nodes unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } + }, retryInterval, done) +} + +func (d *nodeDiscovery) updateNode(node *Node, eventType EventType) { + d.mtx.Lock() + defer d.mtx.Unlock() + + updatedNodeName := node.ObjectMeta.Name + switch eventType { + case Deleted: + // Deleted - remove from nodes map. + delete(d.nodes, updatedNodeName) + case Added, Modified: + // Added/Modified - update the node in the nodes map. + d.nodes[updatedNodeName] = node + } +} + +func (d *nodeDiscovery) getNodes() (map[string]*Node, string, error) { + res, err := d.kd.queryAPIServerPath(nodesURL) + if err != nil { + // If we can't list nodes then we can't watch them. Assume this is a misconfiguration + // & return error. + return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status) + } + + var nodes NodeList + if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body)) + } + + nodeMap := map[string]*Node{} + for idx, node := range nodes.Items { + nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx] + } + + return nodeMap, nodes.ResourceVersion, nil +} + +// nodeAddresses returns the provided node's address, based on the priority: +// 1. NodeInternalIP +// 2. NodeExternalIP +// 3. NodeLegacyHostIP +// +// Copied from k8s.io/kubernetes/pkg/util/node/node.go +func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) { + addresses := node.Status.Addresses + addressMap := map[NodeAddressType][]net.IP{} + for _, addr := range addresses { + ip := net.ParseIP(addr.Address) + // All addresses should be valid IPs. + if ip == nil { + continue + } + addressMap[addr.Type] = append(addressMap[addr.Type], ip) + } + if addresses, ok := addressMap[NodeInternalIP]; ok { + return addresses[0], addressMap, nil + } + if addresses, ok := addressMap[NodeExternalIP]; ok { + return addresses[0], addressMap, nil + } + if addresses, ok := addressMap[NodeLegacyHostIP]; ok { + return addresses[0], addressMap, nil + } + return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) +} diff --git a/retrieval/discovery/kubernetes/pod.go b/retrieval/discovery/kubernetes/pod.go new file mode 100644 index 000000000..1d9759d55 --- /dev/null +++ b/retrieval/discovery/kubernetes/pod.go @@ -0,0 +1,342 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/net/context" +) + +type podDiscovery struct { + mtx sync.RWMutex + pods map[string]map[string]*Pod + retryInterval time.Duration + kd *Discovery +} + +func (d *podDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { + pods, _, err := d.getPods() + if err != nil { + log.Errorf("Cannot initialize pods collection: %s", err) + return + } + d.pods = pods + + initial := []*config.TargetGroup{} + switch d.kd.Conf.Role { + case config.KubernetesRolePod: + initial = append(initial, d.updatePodsTargetGroup()) + case config.KubernetesRoleContainer: + for _, ns := range d.pods { + for _, pod := range ns { + initial = append(initial, d.updateContainerTargetGroup(pod)) + } + } + } + + select { + case ch <- initial: + case <-ctx.Done(): + return + } + + update := make(chan *podEvent, 10) + go d.watchPods(update, ctx.Done(), d.retryInterval) + + for { + tgs := []*config.TargetGroup{} + select { + case <-ctx.Done(): + return + case e := <-update: + log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", e.EventType, e.Pod.ObjectMeta.Name) + d.updatePod(e.Pod, e.EventType) + + switch d.kd.Conf.Role { + case config.KubernetesRoleContainer: + // Update the per-pod target group + tgs = append(tgs, d.updateContainerTargetGroup(e.Pod)) + case config.KubernetesRolePod: + // Update the all pods target group + tgs = append(tgs, d.updatePodsTargetGroup()) + } + } + if tgs == nil { + continue + } + + for _, tg := range tgs { + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } + } + } +} + +func (d *podDiscovery) getPods() (map[string]map[string]*Pod, string, error) { + res, err := d.kd.queryAPIServerPath(podsURL) + if err != nil { + return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status) + } + + var pods PodList + if err := json.NewDecoder(res.Body).Decode(&pods); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body)) + } + + podMap := map[string]map[string]*Pod{} + for idx, pod := range pods.Items { + if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok { + podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{} + } + log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) + podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx] + } + + return podMap, pods.ResourceVersion, nil +} + +func (d *podDiscovery) watchPods(events chan *podEvent, done <-chan struct{}, retryInterval time.Duration) { + until(func() { + pods, resourceVersion, err := d.getPods() + if err != nil { + log.Errorf("Cannot initialize pods collection: %s", err) + return + } + d.mtx.Lock() + d.pods = pods + d.mtx.Unlock() + + req, err := http.NewRequest("GET", podsURL, nil) + if err != nil { + log.Errorf("Cannot create pods request: %s", err) + return + } + + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch pods: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch pods: %d", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event podEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Watch pods unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } + }, retryInterval, done) +} + +func (d *podDiscovery) updatePod(pod *Pod, eventType EventType) { + d.mtx.Lock() + defer d.mtx.Unlock() + + switch eventType { + case Deleted: + if _, ok := d.pods[pod.ObjectMeta.Namespace]; ok { + delete(d.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name) + if len(d.pods[pod.ObjectMeta.Namespace]) == 0 { + delete(d.pods, pod.ObjectMeta.Namespace) + } + } + case Added, Modified: + if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok { + d.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{} + } + d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod + } +} + +func (d *podDiscovery) updateContainerTargetGroup(pod *Pod) *config.TargetGroup { + d.mtx.RLock() + defer d.mtx.RUnlock() + + tg := &config.TargetGroup{ + Source: podSource(pod), + } + + // If this pod doesn't exist, return an empty target group + if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok { + return tg + } + if _, ok := d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok { + return tg + } + + tg.Labels = model.LabelSet{ + roleLabel: model.LabelValue("container"), + } + tg.Targets = updatePodTargets(pod, true) + + return tg +} + +func (d *podDiscovery) updatePodsTargetGroup() *config.TargetGroup { + tg := &config.TargetGroup{ + Source: podsTargetGroupName, + Labels: model.LabelSet{ + roleLabel: model.LabelValue("pod"), + }, + } + + for _, namespace := range d.pods { + for _, pod := range namespace { + tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...) + } + } + + return tg +} + +func podSource(pod *Pod) string { + return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name +} + +func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { + var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers)) + if pod.PodStatus.PodIP == "" { + log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name) + return targets + } + + if pod.PodStatus.Phase != "Running" { + log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name) + return targets + } + + ready := "unknown" + for _, cond := range pod.PodStatus.Conditions { + if strings.ToLower(cond.Type) == "ready" { + ready = strings.ToLower(cond.Status) + } + } + + sort.Sort(ByContainerName(pod.PodSpec.Containers)) + + for _, container := range pod.PodSpec.Containers { + // Collect a list of TCP ports + // Sort by port number, ascending + // Product a target pointed at the first port + // Include a label containing all ports (portName=port,PortName=port,...,) + var tcpPorts []ContainerPort + var portLabel *bytes.Buffer = bytes.NewBufferString(",") + + for _, port := range container.Ports { + if port.Protocol == "TCP" { + tcpPorts = append(tcpPorts, port) + } + } + + if len(tcpPorts) == 0 { + log.Debugf("skipping container %s with no TCP ports", container.Name) + continue + } + + sort.Sort(ByContainerPort(tcpPorts)) + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), + podNameLabel: model.LabelValue(pod.ObjectMeta.Name), + podAddressLabel: model.LabelValue(pod.PodStatus.PodIP), + podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), + podContainerNameLabel: model.LabelValue(container.Name), + podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name), + podReadyLabel: model.LabelValue(ready), + } + + for _, port := range tcpPorts { + portLabel.WriteString(port.Name) + portLabel.WriteString("=") + portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) + portLabel.WriteString(",") + t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) + } + + t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String()) + + for k, v := range pod.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(podLabelPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + + for k, v := range pod.ObjectMeta.Annotations { + labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + + targets = append(targets, t) + + if !allContainers { + break + } + } + + if len(targets) == 0 { + log.Debugf("no targets for pod %s", pod.ObjectMeta.Name) + } + + return targets +} + +type ByContainerPort []ContainerPort + +func (a ByContainerPort) Len() int { return len(a) } +func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } +func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +type ByContainerName []Container + +func (a ByContainerName) Len() int { return len(a) } +func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/retrieval/discovery/kubernetes/service.go b/retrieval/discovery/kubernetes/service.go new file mode 100644 index 000000000..3c0c6d04b --- /dev/null +++ b/retrieval/discovery/kubernetes/service.go @@ -0,0 +1,370 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "sync" + "time" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/net/context" +) + +type serviceDiscovery struct { + mtx sync.RWMutex + services map[string]map[string]*Service + retryInterval time.Duration + kd *Discovery +} + +func (d *serviceDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { + update := make(chan interface{}, 10) + go d.startServiceWatch(update, ctx.Done(), d.retryInterval) + + for { + tgs := []*config.TargetGroup{} + select { + case <-ctx.Done(): + return + case event := <-update: + switch e := event.(type) { + case *endpointsEvent: + log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", e.EventType, e.Endpoints.ObjectMeta.Name) + tgs = append(tgs, d.updateServiceEndpoints(e.Endpoints, e.EventType)) + case *serviceEvent: + log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", e.EventType, e.Service.ObjectMeta.Name) + tgs = append(tgs, d.updateService(e.Service, e.EventType)) + } + } + if tgs == nil { + continue + } + + for _, tg := range tgs { + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } + } + } + +} + +func (d *serviceDiscovery) getServices() (map[string]map[string]*Service, string, error) { + res, err := d.kd.queryAPIServerPath(servicesURL) + if err != nil { + // If we can't list services then we can't watch them. Assume this is a misconfiguration + // & return error. + return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status) + } + var services ServiceList + if err := json.NewDecoder(res.Body).Decode(&services); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body)) + } + + serviceMap := map[string]map[string]*Service{} + for idx, service := range services.Items { + namespace, ok := serviceMap[service.ObjectMeta.Namespace] + if !ok { + namespace = map[string]*Service{} + serviceMap[service.ObjectMeta.Namespace] = namespace + } + namespace[service.ObjectMeta.Name] = &services.Items[idx] + } + + return serviceMap, services.ResourceVersion, nil +} + +// watchServices watches services as they come & go. +func (d *serviceDiscovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) { + until(func() { + // We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted + // in Kubernetes while we couldn't connect - small chance of this, but worth dealing with. + d.mtx.Lock() + existingServices := d.services + + // Reset the known services. + d.services = map[string]map[string]*Service{} + d.mtx.Unlock() + + services, resourceVersion, err := d.getServices() + if err != nil { + log.Errorf("Cannot initialize services collection: %s", err) + return + } + + // Now let's loop through the old services & see if they still exist in here + for oldNSName, oldNS := range existingServices { + if ns, ok := services[oldNSName]; !ok { + for _, service := range existingServices[oldNSName] { + events <- &serviceEvent{Deleted, service} + } + } else { + for oldServiceName, oldService := range oldNS { + if _, ok := ns[oldServiceName]; !ok { + events <- &serviceEvent{Deleted, oldService} + } + } + } + } + + // Discard the existing services map for GC. + existingServices = nil + + for _, ns := range services { + for _, service := range ns { + events <- &serviceEvent{Added, service} + } + } + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + d.watchServices(resourceVersion, events, done) + wg.Done() + }() + go func() { + d.watchServiceEndpoints(resourceVersion, events, done) + wg.Done() + }() + + wg.Wait() + }, retryInterval, done) +} + +func (d *serviceDiscovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { + req, err := http.NewRequest("GET", servicesURL, nil) + if err != nil { + log.Errorf("Failed to create services request: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch services: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch services: %d", res.StatusCode) + return + } + + dec := json.NewDecoder(res.Body) + + for { + var event serviceEvent + if err := dec.Decode(&event); err != nil { + log.Errorf("Watch services unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + return + } + } +} + +// watchServiceEndpoints watches service endpoints as they come & go. +func (d *serviceDiscovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { + req, err := http.NewRequest("GET", endpointsURL, nil) + if err != nil { + log.Errorf("Failed to create service endpoints request: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch service endpoints: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) + return + } + + dec := json.NewDecoder(res.Body) + + for { + var event endpointsEvent + if err := dec.Decode(&event); err != nil { + log.Errorf("Watch service endpoints unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } +} + +func (d *serviceDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup { + d.mtx.Lock() + defer d.mtx.Unlock() + + switch eventType { + case Deleted: + return d.deleteService(service) + case Added, Modified: + return d.addService(service) + } + return nil +} + +func (d *serviceDiscovery) deleteService(service *Service) *config.TargetGroup { + tg := &config.TargetGroup{Source: serviceSource(service)} + + delete(d.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name) + if len(d.services[service.ObjectMeta.Namespace]) == 0 { + delete(d.services, service.ObjectMeta.Namespace) + } + + return tg +} + +func (d *serviceDiscovery) addService(service *Service) *config.TargetGroup { + namespace, ok := d.services[service.ObjectMeta.Namespace] + if !ok { + namespace = map[string]*Service{} + d.services[service.ObjectMeta.Namespace] = namespace + } + + namespace[service.ObjectMeta.Name] = service + endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) + + res, err := d.kd.queryAPIServerPath(endpointURL) + if err != nil { + log.Errorf("Error getting service endpoints: %s", err) + return nil + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to get service endpoints: %d", res.StatusCode) + return nil + } + + var eps Endpoints + if err := json.NewDecoder(res.Body).Decode(&eps); err != nil { + log.Errorf("Error getting service endpoints: %s", err) + return nil + } + + return d.updateServiceTargetGroup(service, &eps) +} + +func (d *serviceDiscovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup { + tg := &config.TargetGroup{ + Source: serviceSource(service), + Labels: model.LabelSet{ + serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace), + serviceNameLabel: model.LabelValue(service.ObjectMeta.Name), + }, + } + + for k, v := range service.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k) + tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) + } + + for k, v := range service.ObjectMeta.Annotations { + labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) + tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) + } + + serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc" + + // Append the first TCP service port if one exists. + for _, port := range service.Spec.Ports { + if port.Protocol == ProtocolTCP { + serviceAddress += fmt.Sprintf(":%d", port.Port) + break + } + } + switch d.kd.Conf.Role { + case config.KubernetesRoleService: + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(serviceAddress), + roleLabel: model.LabelValue("service"), + } + tg.Targets = append(tg.Targets, t) + + case config.KubernetesRoleEndpoint: + // Now let's loop through the endpoints & add them to the target group + // with appropriate labels. + for _, ss := range eps.Subsets { + epPort := ss.Ports[0].Port + + for _, addr := range ss.Addresses { + ipAddr := addr.IP + if len(ipAddr) == net.IPv6len { + ipAddr = "[" + ipAddr + "]" + } + address := fmt.Sprintf("%s:%d", ipAddr, epPort) + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(address), + roleLabel: model.LabelValue("endpoint"), + } + + tg.Targets = append(tg.Targets, t) + } + } + } + + return tg +} + +func (d *serviceDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup { + d.mtx.Lock() + defer d.mtx.Unlock() + + serviceNamespace := endpoints.ObjectMeta.Namespace + serviceName := endpoints.ObjectMeta.Name + + if service, ok := d.services[serviceNamespace][serviceName]; ok { + return d.updateServiceTargetGroup(service, endpoints) + } + return nil +} + +func serviceSource(service *Service) string { + return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name +} diff --git a/retrieval/discovery/marathon.go b/retrieval/discovery/marathon.go deleted file mode 100644 index e22cf48f0..000000000 --- a/retrieval/discovery/marathon.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "time" - - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/retrieval/discovery/marathon" -) - -// NewMarathon creates a new Marathon based discovery. -func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery { - return &marathon.Discovery{ - Servers: conf.Servers, - RefreshInterval: time.Duration(conf.RefreshInterval), - Client: marathon.FetchApps, - } -}