mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-27 14:39:40 -08:00
Merge pull request #1050 from fabric8io/kubernetes-discovery
Kubernetes SD improvements
This commit is contained in:
commit
b7b7b2e883
|
@ -605,7 +605,7 @@ type MarathonSDConfig struct {
|
||||||
|
|
||||||
// KubernetesSDConfig is the configuration for Kubernetes service discovery.
|
// KubernetesSDConfig is the configuration for Kubernetes service discovery.
|
||||||
type KubernetesSDConfig struct {
|
type KubernetesSDConfig struct {
|
||||||
Server string `yaml:"server"`
|
Masters []URL `yaml:"masters"`
|
||||||
KubeletPort int `yaml:"kubelet_port,omitempty"`
|
KubeletPort int `yaml:"kubelet_port,omitempty"`
|
||||||
InCluster bool `yaml:"in_cluster,omitempty"`
|
InCluster bool `yaml:"in_cluster,omitempty"`
|
||||||
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
|
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
|
||||||
|
@ -641,12 +641,8 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if strings.TrimSpace(c.Server) == "" {
|
if len(c.Masters) == 0 {
|
||||||
return fmt.Errorf("Kubernetes SD configuration requires a server address")
|
return fmt.Errorf("Kubernetes SD configuration requires at least one Kubernetes master")
|
||||||
}
|
|
||||||
// Make sure server ends in trailing slash - simpler URL building later
|
|
||||||
if !strings.HasSuffix(c.Server, "/") {
|
|
||||||
c.Server += "/"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return checkOverflow(c.XXX, "kubernetes_sd_config")
|
return checkOverflow(c.XXX, "kubernetes_sd_config")
|
||||||
|
|
|
@ -16,6 +16,7 @@ package config
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -196,7 +197,7 @@ var expectedConf = &Config{
|
||||||
|
|
||||||
KubernetesSDConfigs: []*KubernetesSDConfig{
|
KubernetesSDConfigs: []*KubernetesSDConfig{
|
||||||
{
|
{
|
||||||
Server: "https://localhost:1234/",
|
Masters: []URL{kubernetesSDHostURL()},
|
||||||
Username: "myusername",
|
Username: "myusername",
|
||||||
Password: "mypassword",
|
Password: "mypassword",
|
||||||
KubeletPort: 10255,
|
KubeletPort: 10255,
|
||||||
|
@ -332,3 +333,8 @@ func TestEmptyGlobalBlock(t *testing.T) {
|
||||||
t.Fatalf("want %v, got %v", exp, c)
|
t.Fatalf("want %v, got %v", exp, c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func kubernetesSDHostURL() URL {
|
||||||
|
tURL, _ := url.Parse("https://localhost:1234")
|
||||||
|
return URL{URL: tURL}
|
||||||
|
}
|
||||||
|
|
3
config/testdata/conf.good.yml
vendored
3
config/testdata/conf.good.yml
vendored
|
@ -103,6 +103,7 @@ scrape_configs:
|
||||||
- job_name: service-kubernetes
|
- job_name: service-kubernetes
|
||||||
|
|
||||||
kubernetes_sd_configs:
|
kubernetes_sd_configs:
|
||||||
- server: 'https://localhost:1234'
|
- masters:
|
||||||
|
- 'https://localhost:1234'
|
||||||
username: 'myusername'
|
username: 'myusername'
|
||||||
password: 'mypassword'
|
password: 'mypassword'
|
||||||
|
|
|
@ -8,14 +8,18 @@ global:
|
||||||
scrape_configs:
|
scrape_configs:
|
||||||
- job_name: 'kubernetes'
|
- job_name: 'kubernetes'
|
||||||
|
|
||||||
|
ca_cert: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
|
||||||
|
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
|
||||||
|
|
||||||
kubernetes_sd_configs:
|
kubernetes_sd_configs:
|
||||||
- server: 'https://kubernetes.default.svc'
|
- masters:
|
||||||
|
- 'https://kubernetes.default.svc'
|
||||||
in_cluster: true
|
in_cluster: true
|
||||||
|
|
||||||
relabel_configs:
|
relabel_configs:
|
||||||
- source_labels: [__meta_kubernetes_node, __meta_kubernetes_service_annotation_prometheus_io_scrape]
|
- source_labels: [__meta_kubernetes_role, __meta_kubernetes_service_annotation_prometheus_io_scrape]
|
||||||
action: keep
|
action: keep
|
||||||
regex: ^(?:.+;|;true)$
|
regex: ^(?:(?:master|node);.*|.*;true)$
|
||||||
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scheme]
|
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scheme]
|
||||||
action: replace
|
action: replace
|
||||||
target_label: __scheme__
|
target_label: __scheme__
|
||||||
|
|
|
@ -53,12 +53,16 @@ const (
|
||||||
serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_"
|
serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_"
|
||||||
// nodesTargetGroupName is the name given to the target group for nodes.
|
// nodesTargetGroupName is the name given to the target group for nodes.
|
||||||
nodesTargetGroupName = "nodes"
|
nodesTargetGroupName = "nodes"
|
||||||
|
// mastersTargetGroupName is the name given to the target group for masters.
|
||||||
|
mastersTargetGroupName = "masters"
|
||||||
|
// roleLabel is the name for the label containing a target's role.
|
||||||
|
roleLabel = metaLabelPrefix + "role"
|
||||||
|
|
||||||
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
|
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
|
||||||
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
|
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
|
||||||
|
|
||||||
apiVersion = "v1"
|
apiVersion = "v1"
|
||||||
apiPrefix = "api/" + apiVersion
|
apiPrefix = "/api/" + apiVersion
|
||||||
nodesURL = apiPrefix + "/nodes"
|
nodesURL = apiPrefix + "/nodes"
|
||||||
servicesURL = apiPrefix + "/services"
|
servicesURL = apiPrefix + "/services"
|
||||||
endpointsURL = apiPrefix + "/endpoints"
|
endpointsURL = apiPrefix + "/endpoints"
|
||||||
|
@ -70,6 +74,8 @@ type Discovery struct {
|
||||||
client *http.Client
|
client *http.Client
|
||||||
Conf *config.KubernetesSDConfig
|
Conf *config.KubernetesSDConfig
|
||||||
|
|
||||||
|
masters []config.URL
|
||||||
|
mastersMu sync.RWMutex
|
||||||
nodesResourceVersion string
|
nodesResourceVersion string
|
||||||
servicesResourceVersion string
|
servicesResourceVersion string
|
||||||
endpointsResourceVersion string
|
endpointsResourceVersion string
|
||||||
|
@ -88,6 +94,7 @@ func (kd *Discovery) Initialize() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
kd.masters = kd.Conf.Masters
|
||||||
kd.client = client
|
kd.client = client
|
||||||
kd.nodes = map[string]*Node{}
|
kd.nodes = map[string]*Node{}
|
||||||
kd.services = map[string]map[string]*Service{}
|
kd.services = map[string]map[string]*Service{}
|
||||||
|
@ -98,7 +105,12 @@ func (kd *Discovery) Initialize() error {
|
||||||
|
|
||||||
// Sources implements the TargetProvider interface.
|
// Sources implements the TargetProvider interface.
|
||||||
func (kd *Discovery) Sources() []string {
|
func (kd *Discovery) Sources() []string {
|
||||||
res, err := kd.client.Get(kd.Conf.Server + nodesURL)
|
sourceNames := make([]string, 0, len(kd.masters))
|
||||||
|
for _, master := range kd.masters {
|
||||||
|
sourceNames = append(sourceNames, mastersTargetGroupName+":"+master.Host)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := kd.queryMasterPath(nodesURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
|
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
|
||||||
// & log & return empty.
|
// & log & return empty.
|
||||||
|
@ -120,15 +132,13 @@ func (kd *Discovery) Sources() []string {
|
||||||
kd.nodesMu.Lock()
|
kd.nodesMu.Lock()
|
||||||
defer kd.nodesMu.Unlock()
|
defer kd.nodesMu.Unlock()
|
||||||
|
|
||||||
sourceNames := make([]string, 0, len(nodes.Items))
|
|
||||||
|
|
||||||
kd.nodesResourceVersion = nodes.ResourceVersion
|
kd.nodesResourceVersion = nodes.ResourceVersion
|
||||||
for idx, node := range nodes.Items {
|
for idx, node := range nodes.Items {
|
||||||
sourceNames = append(sourceNames, nodesTargetGroupName+":"+node.ObjectMeta.Name)
|
sourceNames = append(sourceNames, nodesTargetGroupName+":"+node.ObjectMeta.Name)
|
||||||
kd.nodes[node.ObjectMeta.Name] = &nodes.Items[idx]
|
kd.nodes[node.ObjectMeta.Name] = &nodes.Items[idx]
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err = kd.client.Get(kd.Conf.Server + servicesURL)
|
res, err = kd.queryMasterPath(servicesURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If we can't list services then we can't watch them. Assume this is a misconfiguration
|
// If we can't list services then we can't watch them. Assume this is a misconfiguration
|
||||||
// & log & return empty.
|
// & log & return empty.
|
||||||
|
@ -166,6 +176,12 @@ func (kd *Discovery) Sources() []string {
|
||||||
func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
|
func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case ch <- kd.updateMastersTargetGroup():
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case ch <- kd.updateNodesTargetGroup():
|
case ch <- kd.updateNodesTargetGroup():
|
||||||
case <-done:
|
case <-done:
|
||||||
|
@ -215,11 +231,78 @@ func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kd *Discovery) queryMasterPath(path string) (*http.Response, error) {
|
||||||
|
req, err := http.NewRequest("GET", path, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return kd.queryMasterReq(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kd *Discovery) queryMasterReq(req *http.Request) (*http.Response, error) {
|
||||||
|
// Lock in case we need to rotate masters to request.
|
||||||
|
kd.mastersMu.Lock()
|
||||||
|
defer kd.mastersMu.Unlock()
|
||||||
|
for i := 0; i < len(kd.masters); i++ {
|
||||||
|
cloneReq := *req
|
||||||
|
cloneReq.URL.Host = kd.masters[0].Host
|
||||||
|
cloneReq.URL.Scheme = kd.masters[0].Scheme
|
||||||
|
res, err := kd.client.Do(&cloneReq)
|
||||||
|
if err == nil {
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
kd.rotateMasters()
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("Unable to query any masters")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kd *Discovery) rotateMasters() {
|
||||||
|
if len(kd.masters) > 1 {
|
||||||
|
kd.masters = append(kd.masters[1:], kd.masters[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kd *Discovery) updateMastersTargetGroup() *config.TargetGroup {
|
||||||
|
tg := &config.TargetGroup{
|
||||||
|
Source: mastersTargetGroupName,
|
||||||
|
Labels: model.LabelSet{
|
||||||
|
roleLabel: model.LabelValue("master"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, master := range kd.masters {
|
||||||
|
masterAddress := master.Host
|
||||||
|
_, _, err := net.SplitHostPort(masterAddress)
|
||||||
|
// If error then no port is specified - use default for scheme.
|
||||||
|
if err != nil {
|
||||||
|
switch master.Scheme {
|
||||||
|
case "http":
|
||||||
|
masterAddress = net.JoinHostPort(masterAddress, "80")
|
||||||
|
case "https":
|
||||||
|
masterAddress = net.JoinHostPort(masterAddress, "443")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t := model.LabelSet{
|
||||||
|
model.AddressLabel: model.LabelValue(masterAddress),
|
||||||
|
model.SchemeLabel: model.LabelValue(master.Scheme),
|
||||||
|
}
|
||||||
|
tg.Targets = append(tg.Targets, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tg
|
||||||
|
}
|
||||||
|
|
||||||
func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup {
|
func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup {
|
||||||
kd.nodesMu.Lock()
|
kd.nodesMu.Lock()
|
||||||
defer kd.nodesMu.Unlock()
|
defer kd.nodesMu.Unlock()
|
||||||
|
|
||||||
tg := &config.TargetGroup{Source: nodesTargetGroupName}
|
tg := &config.TargetGroup{
|
||||||
|
Source: nodesTargetGroupName,
|
||||||
|
Labels: model.LabelSet{
|
||||||
|
roleLabel: model.LabelValue("node"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// Now let's loop through the nodes & add them to the target group with appropriate labels.
|
// Now let's loop through the nodes & add them to the target group with appropriate labels.
|
||||||
for nodeName, node := range kd.nodes {
|
for nodeName, node := range kd.nodes {
|
||||||
|
@ -256,7 +339,7 @@ func (kd *Discovery) updateNode(node *Node, eventType EventType) {
|
||||||
// watchNodes watches nodes as they come & go.
|
// watchNodes watches nodes as they come & go.
|
||||||
func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
||||||
until(func() {
|
until(func() {
|
||||||
req, err := http.NewRequest("GET", kd.Conf.Server+nodesURL, nil)
|
req, err := http.NewRequest("GET", nodesURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed to watch nodes: %s", err)
|
log.Errorf("Failed to watch nodes: %s", err)
|
||||||
return
|
return
|
||||||
|
@ -265,7 +348,7 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r
|
||||||
values.Add("watch", "true")
|
values.Add("watch", "true")
|
||||||
values.Add("resourceVersion", kd.nodesResourceVersion)
|
values.Add("resourceVersion", kd.nodesResourceVersion)
|
||||||
req.URL.RawQuery = values.Encode()
|
req.URL.RawQuery = values.Encode()
|
||||||
res, err := kd.client.Do(req)
|
res, err := kd.queryMasterReq(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed to watch nodes: %s", err)
|
log.Errorf("Failed to watch nodes: %s", err)
|
||||||
return
|
return
|
||||||
|
@ -296,7 +379,7 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r
|
||||||
// watchServices watches services as they come & go.
|
// watchServices watches services as they come & go.
|
||||||
func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
||||||
until(func() {
|
until(func() {
|
||||||
req, err := http.NewRequest("GET", kd.Conf.Server+servicesURL, nil)
|
req, err := http.NewRequest("GET", servicesURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed to watch services: %s", err)
|
log.Errorf("Failed to watch services: %s", err)
|
||||||
return
|
return
|
||||||
|
@ -306,7 +389,7 @@ func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{}
|
||||||
values.Add("resourceVersion", kd.servicesResourceVersion)
|
values.Add("resourceVersion", kd.servicesResourceVersion)
|
||||||
req.URL.RawQuery = values.Encode()
|
req.URL.RawQuery = values.Encode()
|
||||||
|
|
||||||
res, err := kd.client.Do(req)
|
res, err := kd.queryMasterReq(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed to watch services: %s", err)
|
log.Errorf("Failed to watch services: %s", err)
|
||||||
return
|
return
|
||||||
|
@ -376,7 +459,7 @@ func (kd *Discovery) addService(service *Service) *config.TargetGroup {
|
||||||
namespace[service.ObjectMeta.Name] = service
|
namespace[service.ObjectMeta.Name] = service
|
||||||
endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name)
|
endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name)
|
||||||
|
|
||||||
res, err := kd.client.Get(kd.Conf.Server + endpointURL)
|
res, err := kd.queryMasterPath(endpointURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error getting service endpoints: %s", err)
|
log.Errorf("Error getting service endpoints: %s", err)
|
||||||
return nil
|
return nil
|
||||||
|
@ -401,6 +484,7 @@ func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints)
|
||||||
Labels: model.LabelSet{
|
Labels: model.LabelSet{
|
||||||
serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace),
|
serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace),
|
||||||
serviceNameLabel: model.LabelValue(service.ObjectMeta.Name),
|
serviceNameLabel: model.LabelValue(service.ObjectMeta.Name),
|
||||||
|
roleLabel: model.LabelValue("service"),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -437,7 +521,7 @@ func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints)
|
||||||
// watchServiceEndpoints watches service endpoints as they come & go.
|
// watchServiceEndpoints watches service endpoints as they come & go.
|
||||||
func (kd *Discovery) watchServiceEndpoints(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
func (kd *Discovery) watchServiceEndpoints(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
||||||
until(func() {
|
until(func() {
|
||||||
req, err := http.NewRequest("GET", kd.Conf.Server+endpointsURL, nil)
|
req, err := http.NewRequest("GET", endpointsURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed to watch service endpoints: %s", err)
|
log.Errorf("Failed to watch service endpoints: %s", err)
|
||||||
return
|
return
|
||||||
|
@ -447,7 +531,7 @@ func (kd *Discovery) watchServiceEndpoints(events chan interface{}, done <-chan
|
||||||
values.Add("resourceVersion", kd.servicesResourceVersion)
|
values.Add("resourceVersion", kd.servicesResourceVersion)
|
||||||
req.URL.RawQuery = values.Encode()
|
req.URL.RawQuery = values.Encode()
|
||||||
|
|
||||||
res, err := kd.client.Do(req)
|
res, err := kd.queryMasterReq(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed to watch service endpoints: %s", err)
|
log.Errorf("Failed to watch service endpoints: %s", err)
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in a new issue