diff --git a/config/config.go b/config/config.go index fe8053a49..e7ea0f6b4 100644 --- a/config/config.go +++ b/config/config.go @@ -109,6 +109,13 @@ var ( DefaultMarathonSDConfig = MarathonSDConfig{ RefreshInterval: Duration(30 * time.Second), } + + // The default Kubernetes SD configuration + DefaultKubernetesSDConfig = KubernetesSDConfig{ + KubeletPort: 10255, + RequestTimeout: Duration(10 * time.Second), + RetryInterval: Duration(1 * time.Second), + } ) // This custom URL type allows validating at configuration load time. @@ -315,6 +322,8 @@ type ScrapeConfig struct { ServersetSDConfigs []*ServersetSDConfig `yaml:"serverset_sd_configs,omitempty"` // MarathonSDConfigs is a list of Marathon service discovery configurations. MarathonSDConfigs []*MarathonSDConfig `yaml:"marathon_sd_configs,omitempty"` + // List of Kubernetes service discovery configurations. + KubernetesSDConfigs []*KubernetesSDConfig `yaml:"kubernetes_sd_configs,omitempty"` // List of target relabel configurations. RelabelConfigs []*RelabelConfig `yaml:"relabel_configs,omitempty"` @@ -584,18 +593,52 @@ type MarathonSDConfig struct { XXX map[string]interface{} `yaml:",inline"` } +// KubernetesSDConfig is the configuration for Kubernetes service discovery. +type KubernetesSDConfig struct { + Server string `yaml:"server"` + KubeletPort int `yaml:"kubelet_port,omitempty"` + InCluster bool `yaml:"in_cluster,omitempty"` + BearerTokenFile string `yaml:"bearer_token_file,omitempty"` + Username string `yaml:"username,omitempty"` + Password string `yaml:"password,omitempty"` + Insecure bool `yaml:"insecure,omitempty"` + CertFile string `yaml:"cert_file,omitempty"` + KeyFile string `yaml:"key_file,omitempty"` + CAFile string `yaml:"ca_file,omitempty"` + RetryInterval Duration `yaml:"retry_interval,omitempty"` + RequestTimeout Duration `yaml:"request_timeout,omitempty"` + + // Catches all undefined fields and must be empty after parsing. + XXX map[string]interface{} `yaml:",inline"` +} + // UnmarshalYAML implements the yaml.Unmarshaler interface. func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = DefaultMarathonSDConfig type plain MarathonSDConfig + if len(c.Servers) == 0 { + return fmt.Errorf("Marathon SD config must contain at least one Marathon server") + } + + return checkOverflow(c.XXX, "marathon_sd_config") +} + +func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultKubernetesSDConfig + type plain KubernetesSDConfig err := unmarshal((*plain)(c)) if err != nil { return err } - if len(c.Servers) == 0 { - return fmt.Errorf("Marathon SD config must contain at least one Marathon server") + if strings.TrimSpace(c.Server) == "" { + return fmt.Errorf("Kubernetes SD configuration requires a server address") } - return checkOverflow(c.XXX, "marathon_sd_config") + // Make sure server ends in trailing slash - simpler URL building later + if !strings.HasSuffix(c.Server, "/") { + c.Server += "/" + } + + return checkOverflow(c.XXX, "kubernetes_sd_config") } // RelabelAction is the action to be performed on relabeling. diff --git a/config/config_test.go b/config/config_test.go index 57fc64d57..f38f58d7b 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -174,6 +174,26 @@ var expectedConf = &Config{ }, BearerToken: "avalidtoken", }, + { + JobName: "service-kubernetes", + + ScrapeInterval: Duration(15 * time.Second), + ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, + + MetricsPath: DefaultScrapeConfig.MetricsPath, + Scheme: DefaultScrapeConfig.Scheme, + + KubernetesSDConfigs: []*KubernetesSDConfig{ + { + Server: "https://localhost:1234/", + Username: "myusername", + Password: "mypassword", + KubeletPort: 10255, + RequestTimeout: Duration(10 * time.Second), + RetryInterval: Duration(1 * time.Second), + }, + }, + }, }, original: "", } @@ -189,10 +209,12 @@ func TestLoadConfig(t *testing.T) { if err != nil { t.Fatalf("Error parsing %s: %s", "testdata/conf.good.yml", err) } + bgot, err := yaml.Marshal(c) if err != nil { t.Fatalf("%s", err) } + bexp, err := yaml.Marshal(expectedConf) if err != nil { t.Fatalf("%s", err) diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 65b0ce6cd..239c507ca 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -99,3 +99,10 @@ scrape_configs: key: valid_key_file bearer_token: avalidtoken + +- job_name: service-kubernetes + + kubernetes_sd_configs: + - server: 'https://localhost:1234' + username: 'myusername' + password: 'mypassword' diff --git a/documentation/examples/prometheus-kubernetes.yml b/documentation/examples/prometheus-kubernetes.yml new file mode 100644 index 000000000..c49083ebe --- /dev/null +++ b/documentation/examples/prometheus-kubernetes.yml @@ -0,0 +1,33 @@ +# A scrape configuration containing exactly one endpoint to scrape: +# Here it's Prometheus itself. +global: + scrape_interval: 15s # By default, scrape targets every 15 seconds. + evaluation_interval: 15s # By default, scrape targets every 15 seconds. + scrape_timeout: 10s + +scrape_configs: +- job_name: 'kubernetes' + + kubernetes_sd_configs: + - server: 'https://kubernetes.default.svc' + in_cluster: true + + relabel_configs: + - source_labels: [__meta_kubernetes_node, __meta_kubernetes_service_annotation_prometheus_io_scrape] + action: keep + regex: ^(?:.+;|;true)$ + - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scheme] + action: replace + target_label: __scheme__ + regex: ^(https?)$ + replacement: $1 + - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path] + action: replace + target_label: __metrics_path__ + regex: ^(.+)$ + replacement: $1 + - source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port] + action: replace + target_label: __address__ + regex: ^(.+)(?::\d+);(\d+)$ + replacement: $1:$2 diff --git a/retrieval/discovery/kubernetes.go b/retrieval/discovery/kubernetes.go new file mode 100644 index 000000000..39e91e297 --- /dev/null +++ b/retrieval/discovery/kubernetes.go @@ -0,0 +1,30 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/retrieval/discovery/kubernetes" +) + +func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.KubernetesDiscovery, error) { + kd := &kubernetes.KubernetesDiscovery{ + Conf: conf, + } + err := kd.Initialize() + if err != nil { + return nil, err + } + return kd, nil +} diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go new file mode 100644 index 000000000..ca5e9c5f4 --- /dev/null +++ b/retrieval/discovery/kubernetes/discovery.go @@ -0,0 +1,555 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "os" + "sync" + "time" + + clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/log" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/httputil" + "github.com/prometheus/prometheus/util/strutil" +) + +const ( + sourceServicePrefix = "services" + + // kubernetesMetaLabelPrefix is the meta prefix used for all meta labels. + // in this discovery. + metaLabelPrefix = clientmodel.MetaLabelPrefix + "kubernetes_" + // nodeLabel is the name for the label containing a target's node name. + nodeLabel = metaLabelPrefix + "node" + // serviceNamespaceLabel is the name for the label containing a target's service namespace. + serviceNamespaceLabel = metaLabelPrefix + "service_namespace" + // serviceNameLabel is the name for the label containing a target's service name. + serviceNameLabel = metaLabelPrefix + "service_name" + // nodeLabelPrefix is the prefix for the node labels. + nodeLabelPrefix = metaLabelPrefix + "node_label_" + // serviceLabelPrefix is the prefix for the service labels. + serviceLabelPrefix = metaLabelPrefix + "service_label_" + // serviceAnnotationPrefix is the prefix for the service annotations. + serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_" + // nodesTargetGroupName is the name given to the target group for nodes. + nodesTargetGroupName = "nodes" + + serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token" + serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + + apiVersion = "v1" + apiPrefix = "api/" + apiVersion + nodesURL = apiPrefix + "/nodes" + servicesURL = apiPrefix + "/services" + endpointsURL = apiPrefix + "/endpoints" + serviceEndpointsURL = apiPrefix + "/namespaces/%s/endpoints/%s" +) + +type KubernetesDiscovery struct { + client *http.Client + Conf *config.KubernetesSDConfig + + nodesResourceVersion string + servicesResourceVersion string + endpointsResourceVersion string + nodes map[string]*Node + services map[string]map[string]*Service + nodesMu sync.RWMutex + servicesMu sync.RWMutex + runDone chan struct{} +} + +func (kd *KubernetesDiscovery) Initialize() error { + client, err := newKubernetesHTTPClient(kd.Conf) + + if err != nil { + return err + } + + kd.client = client + kd.nodes = map[string]*Node{} + kd.services = map[string]map[string]*Service{} + kd.runDone = make(chan struct{}) + + return nil +} + +// Sources implements the TargetProvider interface. +func (kd *KubernetesDiscovery) Sources() []string { + res, err := kd.client.Get(kd.Conf.Server + nodesURL) + if err != nil { + // If we can't list nodes then we can't watch them. Assume this is a misconfiguration + // & log & return empty. + log.Errorf("Unable to list Kubernetes nodes: %s", err) + return []string{} + } + if res.StatusCode != http.StatusOK { + log.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status) + return []string{} + } + + var nodes NodeList + if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil { + body, _ := ioutil.ReadAll(res.Body) + log.Errorf("Unable to list Kubernetes nodes. Unexpected response body: %s", string(body)) + return []string{} + } + + kd.nodesMu.Lock() + defer kd.nodesMu.Unlock() + + sourceNames := make([]string, 0, len(nodes.Items)) + + kd.nodesResourceVersion = nodes.ResourceVersion + for idx, node := range nodes.Items { + sourceNames = append(sourceNames, nodesTargetGroupName+":"+node.ObjectMeta.Name) + kd.nodes[node.ObjectMeta.Name] = &nodes.Items[idx] + } + + res, err = kd.client.Get(kd.Conf.Server + servicesURL) + if err != nil { + // If we can't list services then we can't watch them. Assume this is a misconfiguration + // & log & return empty. + log.Errorf("Unable to list Kubernetes services: %s", err) + return []string{} + } + if res.StatusCode != http.StatusOK { + log.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status) + return []string{} + } + var services ServiceList + if err := json.NewDecoder(res.Body).Decode(&services); err != nil { + body, _ := ioutil.ReadAll(res.Body) + log.Errorf("Unable to list Kubernetes services. Unexpected response body: %s", string(body)) + return []string{} + } + kd.servicesMu.Lock() + defer kd.servicesMu.Unlock() + + kd.servicesResourceVersion = services.ResourceVersion + for idx, service := range services.Items { + sourceNames = append(sourceNames, serviceSource(&service)) + namespace, ok := kd.services[service.ObjectMeta.Namespace] + if !ok { + namespace = map[string]*Service{} + kd.services[service.ObjectMeta.Namespace] = namespace + } + namespace[service.ObjectMeta.Name] = &services.Items[idx] + } + + return sourceNames +} + +// Run implements the TargetProvider interface. +func (kd *KubernetesDiscovery) Run(ch chan<- *config.TargetGroup) { + defer close(ch) + + kd.updateNodesTargetGroup(ch) + + for _, ns := range kd.services { + for _, service := range ns { + kd.addService(service, ch) + } + } + + retryInterval := time.Duration(kd.Conf.RetryInterval) + + update := make(chan interface{}, 10) + defer close(update) + + go kd.watchNodes(update, retryInterval) + go kd.watchServices(update, retryInterval) + go kd.watchServiceEndpoints(update, retryInterval) + + for { + select { + case <-kd.runDone: + return + case event := <-update: + switch obj := event.(type) { + case *nodeEvent: + kd.updateNode(obj.Node, obj.EventType) + kd.updateNodesTargetGroup(ch) + case *serviceEvent: + kd.updateService(obj.Service, obj.EventType, ch) + case *endpointsEvent: + kd.updateServiceEndpoints(obj.Endpoints, obj.EventType, ch) + } + } + } +} + +// Stop implements the TargetProvider interface. +func (kd *KubernetesDiscovery) Stop() { + log.Debugf("Stopping Kubernetes discovery for %s", kd.Conf.Server) + + // The lock prevents Run from terminating while the watchers attempt + // to send on their channels. + kd.nodesMu.Lock() + defer kd.nodesMu.Unlock() + kd.servicesMu.Lock() + defer kd.servicesMu.Unlock() + + // Terminate Run. + kd.runDone <- struct{}{} + + log.Debugf("Kubernetes discovery for %s stopped.", kd.Conf.Server) +} + +func (kd *KubernetesDiscovery) updateNodesTargetGroup(ch chan<- *config.TargetGroup) { + kd.nodesMu.Lock() + defer kd.nodesMu.Unlock() + tg := &config.TargetGroup{Source: nodesTargetGroupName} + + // Now let's loop through the nodes & add them to the target group with appropriate labels. + for nodeName, node := range kd.nodes { + address := fmt.Sprintf("%s:%d", node.Status.Addresses[0].Address, kd.Conf.KubeletPort) + + t := clientmodel.LabelSet{ + clientmodel.AddressLabel: clientmodel.LabelValue(address), + nodeLabel: clientmodel.LabelValue(nodeName), + } + for k, v := range node.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k) + t[clientmodel.LabelName(labelName)] = clientmodel.LabelValue(v) + } + tg.Targets = append(tg.Targets, t) + } + + ch <- tg +} + +func (kd *KubernetesDiscovery) updateNode(node *Node, eventType EventType) { + kd.nodesMu.Lock() + defer kd.nodesMu.Unlock() + updatedNodeName := node.ObjectMeta.Name + switch eventType { + case deleted: + // Deleted - remove from nodes map. + delete(kd.nodes, updatedNodeName) + case added, modified: + // Added/Modified - update the node in the nodes map. + kd.nodes[updatedNodeName] = node + } +} + +// watchNodes watches nodes as they come & go. +func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, retryInterval time.Duration) { + until(func() { + req, err := http.NewRequest("GET", kd.Conf.Server+nodesURL, nil) + if err != nil { + log.Errorf("Failed to watch nodes: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", kd.nodesResourceVersion) + req.URL.RawQuery = values.Encode() + res, err := kd.client.Do(req) + if err != nil { + log.Errorf("Failed to watch nodes: %s", err) + return + } + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch nodes: %s", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event nodeEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Failed to watch nodes: %s", err) + return + } + kd.nodesResourceVersion = event.Node.ObjectMeta.ResourceVersion + events <- &event + } + }, retryInterval, kd.runDone) +} + +// watchServices watches services as they come & go. +func (kd *KubernetesDiscovery) watchServices(events chan interface{}, retryInterval time.Duration) { + until(func() { + req, err := http.NewRequest("GET", kd.Conf.Server+servicesURL, nil) + if err != nil { + log.Errorf("Failed to watch services: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", kd.servicesResourceVersion) + req.URL.RawQuery = values.Encode() + + res, err := kd.client.Do(req) + if err != nil { + log.Errorf("Failed to watch services: %s", err) + return + } + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch services: %s", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event serviceEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Unable to watch services: %s", err) + return + } + kd.servicesResourceVersion = event.Service.ObjectMeta.ResourceVersion + events <- &event + } + }, retryInterval, kd.runDone) +} + +func (kd *KubernetesDiscovery) updateService(service *Service, eventType EventType, ch chan<- *config.TargetGroup) { + kd.servicesMu.Lock() + defer kd.servicesMu.Unlock() + name := service.ObjectMeta.Name + namespace := service.ObjectMeta.Namespace + _, ok := kd.services[namespace][name] + switch eventType { + case deleted: + if ok { + kd.deleteService(service, ch) + } + case added, modified: + kd.addService(service, ch) + } +} + +func (kd *KubernetesDiscovery) deleteService(service *Service, ch chan<- *config.TargetGroup) { + tg := &config.TargetGroup{Source: serviceSource(service)} + ch <- tg + delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name) + if len(kd.services[service.ObjectMeta.Namespace]) == 0 { + delete(kd.services, service.ObjectMeta.Namespace) + } +} + +func (kd *KubernetesDiscovery) addService(service *Service, ch chan<- *config.TargetGroup) { + namespace, ok := kd.services[service.ObjectMeta.Namespace] + if !ok { + namespace = map[string]*Service{} + kd.services[service.ObjectMeta.Namespace] = namespace + } + namespace[service.ObjectMeta.Name] = service + endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) + res, err := kd.client.Get(kd.Conf.Server + endpointURL) + if err != nil { + log.Errorf("Error getting service endpoints: %s", err) + return + } + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to get service endpoints: %s", res.StatusCode) + return + } + + var endpoints Endpoints + if err := json.NewDecoder(res.Body).Decode(&endpoints); err != nil { + log.Errorf("Error getting service endpoints: %s", err) + return + } + + kd.updateServiceTargetGroup(service, &endpoints, ch) +} + +func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoints *Endpoints, ch chan<- *config.TargetGroup) { + tg := &config.TargetGroup{ + Source: serviceSource(service), + Labels: clientmodel.LabelSet{ + serviceNamespaceLabel: clientmodel.LabelValue(service.ObjectMeta.Namespace), + serviceNameLabel: clientmodel.LabelValue(service.ObjectMeta.Name), + }, + } + + for k, v := range service.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k) + tg.Labels[clientmodel.LabelName(labelName)] = clientmodel.LabelValue(v) + } + + for k, v := range service.ObjectMeta.Annotations { + labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) + tg.Labels[clientmodel.LabelName(labelName)] = clientmodel.LabelValue(v) + } + + // Now let's loop through the endpoints & add them to the target group with appropriate labels. + for _, eps := range endpoints.Subsets { + epPort := eps.Ports[0].Port + + for _, addr := range eps.Addresses { + ipAddr := addr.IP + if len(ipAddr) == net.IPv6len { + ipAddr = "[" + ipAddr + "]" + } + address := fmt.Sprintf("%s:%d", ipAddr, epPort) + + t := clientmodel.LabelSet{clientmodel.AddressLabel: clientmodel.LabelValue(address)} + + tg.Targets = append(tg.Targets, t) + } + } + + ch <- tg +} + +// watchServiceEndpoints watches service endpoints as they come & go. +func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, retryInterval time.Duration) { + until(func() { + req, err := http.NewRequest("GET", kd.Conf.Server+endpointsURL, nil) + if err != nil { + log.Errorf("Failed to watch service endpoints: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", kd.servicesResourceVersion) + req.URL.RawQuery = values.Encode() + + res, err := kd.client.Do(req) + if err != nil { + log.Errorf("Failed to watch service endpoints: %s", err) + return + } + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch service endpoints: %s", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event endpointsEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Unable to watch service endpoints: %s", err) + return + } + kd.servicesResourceVersion = event.Endpoints.ObjectMeta.ResourceVersion + events <- &event + } + }, retryInterval, kd.runDone) +} + +func (kd *KubernetesDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType, ch chan<- *config.TargetGroup) { + kd.servicesMu.Lock() + defer kd.servicesMu.Unlock() + serviceNamespace := endpoints.ObjectMeta.Namespace + serviceName := endpoints.ObjectMeta.Name + if service, ok := kd.services[serviceNamespace][serviceName]; ok { + kd.updateServiceTargetGroup(service, endpoints, ch) + } +} + +func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) { + bearerTokenFile := conf.BearerTokenFile + caFile := conf.CAFile + if conf.InCluster { + if len(bearerTokenFile) == 0 { + bearerTokenFile = serviceAccountToken + } + if len(caFile) == 0 { + // With recent versions, the CA certificate is provided as a token + // but we need to handle older versions too. In this case, don't + // set the CAFile & the configuration will have to use Insecure. + if _, err := os.Stat(serviceAccountCACert); err == nil { + caFile = serviceAccountCACert + } + } + } + + tlsConfig := &tls.Config{InsecureSkipVerify: conf.Insecure} + + // Load client cert if specified. + if len(conf.CertFile) > 0 && len(conf.KeyFile) > 0 { + cert, err := tls.LoadX509KeyPair(conf.CertFile, conf.KeyFile) + if err != nil { + return nil, err + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + caCertPool := x509.NewCertPool() + if len(caFile) > 0 { + // Load CA cert. + caCert, err := ioutil.ReadFile(caFile) + if err != nil { + return nil, err + } + caCertPool.AppendCertsFromPEM(caCert) + } + tlsConfig.RootCAs = caCertPool + + tlsConfig.BuildNameToCertificate() + + tr := &http.Transport{ + Dial: func(netw, addr string) (c net.Conn, err error) { + c, err = net.DialTimeout(netw, addr, time.Duration(conf.RequestTimeout)) + return + }, + } + tr.TLSClientConfig = tlsConfig + var rt http.RoundTripper + rt = tr + + bearerToken, err := ioutil.ReadFile(bearerTokenFile) + if err != nil { + return nil, err + } + + if len(bearerToken) > 0 { + rt = httputil.NewBearerAuthRoundTripper(string(bearerToken), rt) + } + if len(conf.Username) > 0 && len(conf.Password) > 0 { + rt = httputil.NewBasicAuthRoundTripper(conf.Username, conf.Password, rt) + } + + return &http.Client{ + Transport: rt, + }, nil +} + +func serviceSource(service *Service) string { + return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name +} + +// Until loops until stop channel is closed, running f every period. +// f may not be invoked if stop channel is already closed. +func until(f func(), period time.Duration, stopCh <-chan struct{}) { + select { + case <-stopCh: + return + default: + f() + } + for { + select { + case <-stopCh: + return + case <-time.After(period): + f() + } + } +} diff --git a/retrieval/discovery/kubernetes/types.go b/retrieval/discovery/kubernetes/types.go new file mode 100644 index 000000000..14b26ab39 --- /dev/null +++ b/retrieval/discovery/kubernetes/types.go @@ -0,0 +1,210 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +type EventType string + +const ( + added EventType = "ADDED" + modified EventType = "MODIFIED" + deleted EventType = "DELETED" +) + +type nodeEvent struct { + EventType EventType `json:"type"` + Node *Node `json:"object"` +} + +type serviceEvent struct { + EventType EventType `json:"type"` + Service *Service `json:"object"` +} + +type endpointsEvent struct { + EventType EventType `json:"type"` + Endpoints *Endpoints `json:"object"` +} + +// From here down types are copied from +// https://github.com/GoogleCloudPlatform/kubernetes/blob/master/pkg/api/v1/types.go +// with all currently irrelevant types/fields stripped out. This removes the +// need for any kubernetes dependencies, with the drawback of having to keep +// this file up to date. + +// ListMeta describes metadata that synthetic resources must have, including lists and +// various status objects. +type ListMeta struct { + // An opaque value that represents the version of this response for use with optimistic + // concurrency and change monitoring endpoints. Clients must treat these values as opaque + // and values may only be valid for a particular resource or set of resources. Only servers + // will generate resource versions. + ResourceVersion string `json:"resourceVersion,omitempty" description:"string that identifies the internal version of this object that can be used by clients to determine when objects have changed; populated by the system, read-only; value must be treated as opaque by clients and passed unmodified back to the server: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#concurrency-control-and-consistency"` +} + +// ObjectMeta is metadata that all persisted resources must have, which includes all objects +// users must create. +type ObjectMeta struct { + // Name is unique within a namespace. Name is required when creating resources, although + // some resources may allow a client to request the generation of an appropriate name + // automatically. Name is primarily intended for creation idempotence and configuration + // definition. + Name string `json:"name,omitempty" description:"string that identifies an object. Must be unique within a namespace; cannot be updated; see http://releases.k8s.io/HEAD/docs/user-guide/identifiers.md#names"` + + // Namespace defines the space within which name must be unique. An empty namespace is + // equivalent to the "default" namespace, but "default" is the canonical representation. + // Not all objects are required to be scoped to a namespace - the value of this field for + // those objects will be empty. + Namespace string `json:"namespace,omitempty" description:"namespace of the object; must be a DNS_LABEL; cannot be updated; see http://releases.k8s.io/HEAD/docs/user-guide/namespaces.md"` + + ResourceVersion string `json:"resourceVersion,omitempty" description:"string that identifies the internal version of this object that can be used by clients to determine when objects have changed; populated by the system, read-only; value must be treated as opaque by clients and passed unmodified back to the server: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#concurrency-control-and-consistency"` + + // TODO: replace map[string]string with labels.LabelSet type + Labels map[string]string `json:"labels,omitempty" description:"map of string keys and values that can be used to organize and categorize objects; may match selectors of replication controllers and services; see http://releases.k8s.io/HEAD/docs/user-guide/labels.md"` + + // Annotations are unstructured key value data stored with a resource that may be set by + // external tooling. They are not queryable and should be preserved when modifying + // objects. + Annotations map[string]string `json:"annotations,omitempty" description:"map of string keys and values that can be used by external tooling to store and retrieve arbitrary metadata about objects; see http://releases.k8s.io/HEAD/docs/user-guide/annotations.md"` +} + +// Protocol defines network protocols supported for things like conatiner ports. +type Protocol string + +const ( + // ProtocolTCP is the TCP protocol. + ProtocolTCP Protocol = "TCP" + // ProtocolUDP is the UDP protocol. + ProtocolUDP Protocol = "UDP" +) + +const ( + // NamespaceAll is the default argument to specify on a context when you want to list or filter resources across all namespaces + NamespaceAll string = "" +) + +// Container represents a single container that is expected to be run on the host. +type Container struct { + // Required: This must be a DNS_LABEL. Each container in a pod must + // have a unique name. + Name string `json:"name" description:"name of the container; must be a DNS_LABEL and unique within the pod; cannot be updated"` + // Optional. + Image string `json:"image,omitempty" description:"Docker image name; see http://releases.k8s.io/HEAD/docs/user-guide/images.md"` +} + +// Service is a named abstraction of software service (for example, mysql) consisting of local port +// (for example 3306) that the proxy listens on, and the selector that determines which pods +// will answer requests sent through the proxy. +type Service struct { + ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` +} + +// ServiceList holds a list of services. +type ServiceList struct { + ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` + + Items []Service `json:"items" description:"list of services"` +} + +// Endpoints is a collection of endpoints that implement the actual service. Example: +// Name: "mysvc", +// Subsets: [ +// { +// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}], +// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}] +// }, +// { +// Addresses: [{"ip": "10.10.3.3"}], +// Ports: [{"name": "a", "port": 93}, {"name": "b", "port": 76}] +// }, +// ] +type Endpoints struct { + ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` + + // The set of all endpoints is the union of all subsets. + Subsets []EndpointSubset `json:"subsets" description:"sets of addresses and ports that comprise a service"` +} + +// EndpointSubset is a group of addresses with a common set of ports. The +// expanded set of endpoints is the Cartesian product of Addresses x Ports. +// For example, given: +// { +// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}], +// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}] +// } +// The resulting set of endpoints can be viewed as: +// a: [ 10.10.1.1:8675, 10.10.2.2:8675 ], +// b: [ 10.10.1.1:309, 10.10.2.2:309 ] +type EndpointSubset struct { + Addresses []EndpointAddress `json:"addresses,omitempty" description:"IP addresses which offer the related ports"` + Ports []EndpointPort `json:"ports,omitempty" description:"port numbers available on the related IP addresses"` +} + +// EndpointAddress is a tuple that describes single IP address. +type EndpointAddress struct { + // The IP of this endpoint. + // TODO: This should allow hostname or IP, see #4447. + IP string `json:"ip" description:"IP address of the endpoint"` +} + +// EndpointPort is a tuple that describes a single port. +type EndpointPort struct { + // The port number. + Port int `json:"port" description:"port number of the endpoint"` + + // The IP protocol for this port. + Protocol Protocol `json:"protocol,omitempty" description:"protocol for this port; must be UDP or TCP; TCP if unspecified"` +} + +// EndpointsList is a list of endpoints. +type EndpointsList struct { + ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` + + Items []Endpoints `json:"items" description:"list of endpoints"` +} + +// NodeStatus is information about the current status of a node. +type NodeStatus struct { + // Queried from cloud provider, if available. + Addresses []NodeAddress `json:"addresses,omitempty" description:"list of addresses reachable to the node; see http://releases.k8s.io/HEAD/docs/admin/node.md#node-addresses" patchStrategy:"merge" patchMergeKey:"type"` +} + +type NodeAddressType string + +// These are valid address type of node. +const ( + NodeHostName NodeAddressType = "Hostname" + NodeExternalIP NodeAddressType = "ExternalIP" + NodeInternalIP NodeAddressType = "InternalIP" +) + +type NodeAddress struct { + Type NodeAddressType `json:"type" description:"node address type, one of Hostname, ExternalIP or InternalIP"` + Address string `json:"address" description:"the node address"` +} + +// Node is a worker node in Kubernetes, formerly known as minion. +// Each node will have a unique identifier in the cache (i.e. in etcd). +type Node struct { + ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` + + // Status describes the current status of a Node + Status NodeStatus `json:"status,omitempty" description:"most recently observed status of the node; populated by the system, read-only; http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status"` +} + +// NodeList is the whole list of all Nodes which have been registered with master. +type NodeList struct { + ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` + + Items []Node `json:"items" description:"list of nodes"` +} diff --git a/retrieval/discovery/serverset.go b/retrieval/discovery/serverset.go index 7cc38df6a..5a84186fc 100644 --- a/retrieval/discovery/serverset.go +++ b/retrieval/discovery/serverset.go @@ -17,7 +17,6 @@ import ( "bytes" "encoding/json" "fmt" - "regexp" "strings" "sync" "time" @@ -28,6 +27,7 @@ import ( clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" ) const ( @@ -39,10 +39,6 @@ const ( serversetEndpointLabelPrefix = serversetLabelPrefix + "endpoint" ) -var ( - invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) -) - type serversetMember struct { ServiceEndpoint serversetEndpoint AdditionalEndpoints map[string]serversetEndpoint @@ -176,7 +172,7 @@ func parseServersetMember(data []byte, path string) (*clientmodel.LabelSet, erro labels[serversetEndpointLabelPrefix+"_port"] = clientmodel.LabelValue(fmt.Sprintf("%d", member.ServiceEndpoint.Port)) for name, endpoint := range member.AdditionalEndpoints { - cleanName := clientmodel.LabelName(invalidLabelCharRE.ReplaceAllString(name, "_")) + cleanName := clientmodel.LabelName(strutil.SanitizeLabelName(name)) labels[serversetEndpointLabelPrefix+"_host_"+cleanName] = clientmodel.LabelValue( endpoint.Host) labels[serversetEndpointLabelPrefix+"_port_"+cleanName] = clientmodel.LabelValue( diff --git a/retrieval/target.go b/retrieval/target.go index b87d1b197..daf78f423 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -223,9 +223,6 @@ func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels clientm } } t.url.RawQuery = params.Encode() - if cfg.BasicAuth != nil { - t.url.User = url.UserPassword(cfg.BasicAuth.Username, cfg.BasicAuth.Password) - } t.scrapeInterval = time.Duration(cfg.ScrapeInterval) t.deadline = time.Duration(cfg.ScrapeTimeout) @@ -292,6 +289,10 @@ func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) { rt = httputil.NewBearerAuthRoundTripper(bearerToken, rt) } + if cfg.BasicAuth != nil { + rt = httputil.NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, rt) + } + // Return a new client with the configured round tripper. return httputil.NewClient(rt), nil } diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 95f327a62..813dad8ee 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -31,7 +31,6 @@ import ( clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/util/httputil" ) func TestBaseLabels(t *testing.T) { @@ -438,6 +437,10 @@ func TestURLParams(t *testing.T) { } func newTestTarget(targetURL string, deadline time.Duration, baseLabels clientmodel.LabelSet) *Target { + cfg := &config.ScrapeConfig{ + ScrapeTimeout: config.Duration(deadline), + } + c, _ := newHTTPClient(cfg) t := &Target{ url: &url.URL{ Scheme: "http", @@ -447,7 +450,7 @@ func newTestTarget(targetURL string, deadline time.Duration, baseLabels clientmo deadline: deadline, status: &TargetStatus{}, scrapeInterval: 1 * time.Millisecond, - httpClient: httputil.NewDeadlineClient(deadline, nil), + httpClient: c, scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), } @@ -516,6 +519,36 @@ func TestNewHTTPBearerTokenFile(t *testing.T) { } } +func TestNewHTTPBasicAuth(t *testing.T) { + server := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + username, password, ok := r.BasicAuth() + if !(ok && username == "user" && password == "password123") { + t.Fatalf("Basic authorization header was not set correctly: expected '%v:%v', got '%v:%v'", "user", "password123", username, password) + } + }, + ), + ) + defer server.Close() + + cfg := &config.ScrapeConfig{ + ScrapeTimeout: config.Duration(1 * time.Second), + BasicAuth: &config.BasicAuth{ + Username: "user", + Password: "password123", + }, + } + c, err := newHTTPClient(cfg) + if err != nil { + t.Fatal(err) + } + _, err = c.Get(server.URL) + if err != nil { + t.Fatal(err) + } +} + func TestNewHTTPCACert(t *testing.T) { server := httptest.NewUnstartedServer( http.HandlerFunc( diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 1643af2b7..61f103590 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -359,6 +359,14 @@ func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { for i, c := range cfg.MarathonSDConfigs { app("marathon", i, discovery.NewMarathonDiscovery(c)) } + for i, c := range cfg.KubernetesSDConfigs { + k, err := discovery.NewKubernetesDiscovery(c) + if err != nil { + log.Errorf("Cannot create Kubernetes discovery: %s", err) + continue + } + app("kubernetes", i, k) + } for i, c := range cfg.ServersetSDConfigs { app("serverset", i, discovery.NewServersetDiscovery(c)) } diff --git a/util/httputil/client.go b/util/httputil/client.go index 7703e311f..e8c3a43fb 100644 --- a/util/httputil/client.go +++ b/util/httputil/client.go @@ -74,6 +74,27 @@ func (rt *bearerAuthRoundTripper) RoundTrip(req *http.Request) (*http.Response, return rt.rt.RoundTrip(req) } +type basicAuthRoundTripper struct { + username string + password string + rt http.RoundTripper +} + +// NewBasicAuthRoundTripper will apply a BASIC auth authorization header to a request unless it has +// already been set. +func NewBasicAuthRoundTripper(username, password string, rt http.RoundTripper) http.RoundTripper { + return &basicAuthRoundTripper{username, password, rt} +} + +func (rt *basicAuthRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + if len(req.Header.Get("Authorization")) != 0 { + return rt.rt.RoundTrip(req) + } + req = cloneRequest(req) + req.SetBasicAuth(rt.username, rt.password) + return rt.rt.RoundTrip(req) +} + // cloneRequest returns a clone of the provided *http.Request. // The clone is a shallow copy of the struct and its Header map. func cloneRequest(r *http.Request) *http.Request { diff --git a/util/strutil/strconv.go b/util/strutil/strconv.go index 43d42633e..1b7edf66b 100644 --- a/util/strutil/strconv.go +++ b/util/strutil/strconv.go @@ -22,7 +22,10 @@ import ( "time" ) -var durationRE = regexp.MustCompile("^([0-9]+)([ywdhms]+)$") +var ( + durationRE = regexp.MustCompile("^([0-9]+)([ywdhms]+)$") + invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) +) // DurationToString formats a time.Duration as a string with the assumption that // a year always has 365 days and a day always has 24h. (The former doesn't work @@ -103,3 +106,9 @@ func GraphLinkForExpression(expr string) string { urlData := url.QueryEscape(fmt.Sprintf(`[{"expr":%q,"tab":0}]`, expr)) return fmt.Sprintf("/graph#%s", strings.Replace(urlData, "+", "%20", -1)) } + +// SanitizeLabelName replaces anything that doesn't match +// client_label.LabelNameRE with an underscore. +func SanitizeLabelName(name string) string { + return invalidLabelCharRE.ReplaceAllString(name, "_") +}