From d7a7fd4589b7cf20daccf82c41aaadf47f10777f Mon Sep 17 00:00:00 2001 From: Jimmi Dyson Date: Thu, 3 Sep 2015 10:47:09 +0100 Subject: [PATCH] Kubernetes SD improvements * Support multiple masters with retries against each master as required. * Scrape masters' metrics. * Add role meta label for node/service/master to make it easier for relabeling. --- config/config.go | 10 +- config/config_test.go | 8 +- config/testdata/conf.good.yml | 3 +- .../examples/prometheus-kubernetes.yml | 10 +- retrieval/discovery/kubernetes/discovery.go | 110 +++++++++++++++--- 5 files changed, 116 insertions(+), 25 deletions(-) diff --git a/config/config.go b/config/config.go index ece179305..94efe33ca 100644 --- a/config/config.go +++ b/config/config.go @@ -605,7 +605,7 @@ type MarathonSDConfig struct { // KubernetesSDConfig is the configuration for Kubernetes service discovery. type KubernetesSDConfig struct { - Server string `yaml:"server"` + Masters []URL `yaml:"masters"` KubeletPort int `yaml:"kubelet_port,omitempty"` InCluster bool `yaml:"in_cluster,omitempty"` BearerTokenFile string `yaml:"bearer_token_file,omitempty"` @@ -641,12 +641,8 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er if err != nil { return err } - if strings.TrimSpace(c.Server) == "" { - return fmt.Errorf("Kubernetes SD configuration requires a server address") - } - // Make sure server ends in trailing slash - simpler URL building later - if !strings.HasSuffix(c.Server, "/") { - c.Server += "/" + if len(c.Masters) == 0 { + return fmt.Errorf("Kubernetes SD configuration requires at least one Kubernetes master") } return checkOverflow(c.XXX, "kubernetes_sd_config") diff --git a/config/config_test.go b/config/config_test.go index cbd9034e7..14161637e 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -16,6 +16,7 @@ package config import ( "encoding/json" "io/ioutil" + "net/url" "reflect" "strings" "testing" @@ -196,7 +197,7 @@ var expectedConf = &Config{ KubernetesSDConfigs: []*KubernetesSDConfig{ { - Server: "https://localhost:1234/", + Masters: []URL{kubernetesSDHostURL()}, Username: "myusername", Password: "mypassword", KubeletPort: 10255, @@ -332,3 +333,8 @@ func TestEmptyGlobalBlock(t *testing.T) { t.Fatalf("want %v, got %v", exp, c) } } + +func kubernetesSDHostURL() URL { + tURL, _ := url.Parse("https://localhost:1234") + return URL{URL: tURL} +} diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 979c79bc3..7725950ee 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -103,6 +103,7 @@ scrape_configs: - job_name: service-kubernetes kubernetes_sd_configs: - - server: 'https://localhost:1234' + - masters: + - 'https://localhost:1234' username: 'myusername' password: 'mypassword' diff --git a/documentation/examples/prometheus-kubernetes.yml b/documentation/examples/prometheus-kubernetes.yml index c49083ebe..6a0b7cbd4 100644 --- a/documentation/examples/prometheus-kubernetes.yml +++ b/documentation/examples/prometheus-kubernetes.yml @@ -8,14 +8,18 @@ global: scrape_configs: - job_name: 'kubernetes' + ca_cert: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token + kubernetes_sd_configs: - - server: 'https://kubernetes.default.svc' + - masters: + - 'https://kubernetes.default.svc' in_cluster: true relabel_configs: - - source_labels: [__meta_kubernetes_node, __meta_kubernetes_service_annotation_prometheus_io_scrape] + - source_labels: [__meta_kubernetes_role, __meta_kubernetes_service_annotation_prometheus_io_scrape] action: keep - regex: ^(?:.+;|;true)$ + regex: ^(?:(?:master|node);.*|.*;true)$ - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scheme] action: replace target_label: __scheme__ diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index 6c5ded626..19e336ced 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -53,12 +53,16 @@ const ( serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_" // nodesTargetGroupName is the name given to the target group for nodes. nodesTargetGroupName = "nodes" + // mastersTargetGroupName is the name given to the target group for masters. + mastersTargetGroupName = "masters" + // roleLabel is the name for the label containing a target's role. + roleLabel = metaLabelPrefix + "role" serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token" serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" apiVersion = "v1" - apiPrefix = "api/" + apiVersion + apiPrefix = "/api/" + apiVersion nodesURL = apiPrefix + "/nodes" servicesURL = apiPrefix + "/services" endpointsURL = apiPrefix + "/endpoints" @@ -70,6 +74,8 @@ type Discovery struct { client *http.Client Conf *config.KubernetesSDConfig + masters []config.URL + mastersMu sync.RWMutex nodesResourceVersion string servicesResourceVersion string endpointsResourceVersion string @@ -88,6 +94,7 @@ func (kd *Discovery) Initialize() error { return err } + kd.masters = kd.Conf.Masters kd.client = client kd.nodes = map[string]*Node{} kd.services = map[string]map[string]*Service{} @@ -98,7 +105,12 @@ func (kd *Discovery) Initialize() error { // Sources implements the TargetProvider interface. func (kd *Discovery) Sources() []string { - res, err := kd.client.Get(kd.Conf.Server + nodesURL) + sourceNames := make([]string, 0, len(kd.masters)) + for _, master := range kd.masters { + sourceNames = append(sourceNames, mastersTargetGroupName+":"+master.Host) + } + + res, err := kd.queryMasterPath(nodesURL) if err != nil { // If we can't list nodes then we can't watch them. Assume this is a misconfiguration // & log & return empty. @@ -120,15 +132,13 @@ func (kd *Discovery) Sources() []string { kd.nodesMu.Lock() defer kd.nodesMu.Unlock() - sourceNames := make([]string, 0, len(nodes.Items)) - kd.nodesResourceVersion = nodes.ResourceVersion for idx, node := range nodes.Items { sourceNames = append(sourceNames, nodesTargetGroupName+":"+node.ObjectMeta.Name) kd.nodes[node.ObjectMeta.Name] = &nodes.Items[idx] } - res, err = kd.client.Get(kd.Conf.Server + servicesURL) + res, err = kd.queryMasterPath(servicesURL) if err != nil { // If we can't list services then we can't watch them. Assume this is a misconfiguration // & log & return empty. @@ -166,6 +176,12 @@ func (kd *Discovery) Sources() []string { func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { defer close(ch) + select { + case ch <- kd.updateMastersTargetGroup(): + case <-done: + return + } + select { case ch <- kd.updateNodesTargetGroup(): case <-done: @@ -215,11 +231,78 @@ func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { } } +func (kd *Discovery) queryMasterPath(path string) (*http.Response, error) { + req, err := http.NewRequest("GET", path, nil) + if err != nil { + return nil, err + } + return kd.queryMasterReq(req) +} + +func (kd *Discovery) queryMasterReq(req *http.Request) (*http.Response, error) { + // Lock in case we need to rotate masters to request. + kd.mastersMu.Lock() + defer kd.mastersMu.Unlock() + for i := 0; i < len(kd.masters); i++ { + cloneReq := *req + cloneReq.URL.Host = kd.masters[0].Host + cloneReq.URL.Scheme = kd.masters[0].Scheme + res, err := kd.client.Do(&cloneReq) + if err == nil { + return res, nil + } + kd.rotateMasters() + } + return nil, fmt.Errorf("Unable to query any masters") +} + +func (kd *Discovery) rotateMasters() { + if len(kd.masters) > 1 { + kd.masters = append(kd.masters[1:], kd.masters[0]) + } +} + +func (kd *Discovery) updateMastersTargetGroup() *config.TargetGroup { + tg := &config.TargetGroup{ + Source: mastersTargetGroupName, + Labels: model.LabelSet{ + roleLabel: model.LabelValue("master"), + }, + } + + for _, master := range kd.masters { + masterAddress := master.Host + _, _, err := net.SplitHostPort(masterAddress) + // If error then no port is specified - use default for scheme. + if err != nil { + switch master.Scheme { + case "http": + masterAddress = net.JoinHostPort(masterAddress, "80") + case "https": + masterAddress = net.JoinHostPort(masterAddress, "443") + } + } + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(masterAddress), + model.SchemeLabel: model.LabelValue(master.Scheme), + } + tg.Targets = append(tg.Targets, t) + } + + return tg +} + func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup { kd.nodesMu.Lock() defer kd.nodesMu.Unlock() - tg := &config.TargetGroup{Source: nodesTargetGroupName} + tg := &config.TargetGroup{ + Source: nodesTargetGroupName, + Labels: model.LabelSet{ + roleLabel: model.LabelValue("node"), + }, + } // Now let's loop through the nodes & add them to the target group with appropriate labels. for nodeName, node := range kd.nodes { @@ -256,7 +339,7 @@ func (kd *Discovery) updateNode(node *Node, eventType EventType) { // watchNodes watches nodes as they come & go. func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { until(func() { - req, err := http.NewRequest("GET", kd.Conf.Server+nodesURL, nil) + req, err := http.NewRequest("GET", nodesURL, nil) if err != nil { log.Errorf("Failed to watch nodes: %s", err) return @@ -265,7 +348,7 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r values.Add("watch", "true") values.Add("resourceVersion", kd.nodesResourceVersion) req.URL.RawQuery = values.Encode() - res, err := kd.client.Do(req) + res, err := kd.queryMasterReq(req) if err != nil { log.Errorf("Failed to watch nodes: %s", err) return @@ -296,7 +379,7 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r // watchServices watches services as they come & go. func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { until(func() { - req, err := http.NewRequest("GET", kd.Conf.Server+servicesURL, nil) + req, err := http.NewRequest("GET", servicesURL, nil) if err != nil { log.Errorf("Failed to watch services: %s", err) return @@ -306,7 +389,7 @@ func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{} values.Add("resourceVersion", kd.servicesResourceVersion) req.URL.RawQuery = values.Encode() - res, err := kd.client.Do(req) + res, err := kd.queryMasterReq(req) if err != nil { log.Errorf("Failed to watch services: %s", err) return @@ -376,7 +459,7 @@ func (kd *Discovery) addService(service *Service) *config.TargetGroup { namespace[service.ObjectMeta.Name] = service endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) - res, err := kd.client.Get(kd.Conf.Server + endpointURL) + res, err := kd.queryMasterPath(endpointURL) if err != nil { log.Errorf("Error getting service endpoints: %s", err) return nil @@ -401,6 +484,7 @@ func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints) Labels: model.LabelSet{ serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace), serviceNameLabel: model.LabelValue(service.ObjectMeta.Name), + roleLabel: model.LabelValue("service"), }, } @@ -437,7 +521,7 @@ func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints) // watchServiceEndpoints watches service endpoints as they come & go. func (kd *Discovery) watchServiceEndpoints(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { until(func() { - req, err := http.NewRequest("GET", kd.Conf.Server+endpointsURL, nil) + req, err := http.NewRequest("GET", endpointsURL, nil) if err != nil { log.Errorf("Failed to watch service endpoints: %s", err) return @@ -447,7 +531,7 @@ func (kd *Discovery) watchServiceEndpoints(events chan interface{}, done <-chan values.Add("resourceVersion", kd.servicesResourceVersion) req.URL.RawQuery = values.Encode() - res, err := kd.client.Do(req) + res, err := kd.queryMasterReq(req) if err != nil { log.Errorf("Failed to watch service endpoints: %s", err) return