diff --git a/config/config.go b/config/config.go index 454d10bd10..62d3db1ef3 100644 --- a/config/config.go +++ b/config/config.go @@ -127,10 +127,7 @@ var ( } // DefaultKubernetesSDConfig is the default Kubernetes SD configuration - DefaultKubernetesSDConfig = KubernetesSDConfig{ - RequestTimeout: model.Duration(10 * time.Second), - RetryInterval: model.Duration(1 * time.Second), - } + DefaultKubernetesSDConfig = KubernetesSDConfig{} // DefaultGCESDConfig is the default EC2 SD configuration. DefaultGCESDConfig = GCESDConfig{ @@ -442,8 +439,6 @@ type ScrapeConfig struct { MarathonSDConfigs []*MarathonSDConfig `yaml:"marathon_sd_configs,omitempty"` // List of Kubernetes service discovery configurations. KubernetesSDConfigs []*KubernetesSDConfig `yaml:"kubernetes_sd_configs,omitempty"` - // List of Kubernetes service discovery configurations. - KubernetesV2SDConfigs []*KubernetesV2SDConfig `yaml:"kubernetes_v2_sd_configs,omitempty"` // List of GCE service discovery configurations. GCESDConfigs []*GCESDConfig `yaml:"gce_sd_configs,omitempty"` // List of EC2 service discovery configurations. @@ -798,31 +793,13 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro return nil } -// KubernetesSDConfig is the configuration for Kubernetes service discovery. -type KubernetesSDConfig struct { - APIServers []URL `yaml:"api_servers"` - Role KubernetesRole `yaml:"role"` - InCluster bool `yaml:"in_cluster,omitempty"` - BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` - BearerToken string `yaml:"bearer_token,omitempty"` - BearerTokenFile string `yaml:"bearer_token_file,omitempty"` - RetryInterval model.Duration `yaml:"retry_interval,omitempty"` - RequestTimeout model.Duration `yaml:"request_timeout,omitempty"` - TLSConfig TLSConfig `yaml:"tls_config,omitempty"` - - // Catches all undefined fields and must be empty after parsing. - XXX map[string]interface{} `yaml:",inline"` -} - type KubernetesRole string const ( - KubernetesRoleNode = "node" - KubernetesRolePod = "pod" - KubernetesRoleContainer = "container" - KubernetesRoleService = "service" - KubernetesRoleEndpoint = "endpoint" - KubernetesRoleAPIServer = "apiserver" + KubernetesRoleNode = "node" + KubernetesRolePod = "pod" + KubernetesRoleService = "service" + KubernetesRoleEndpoint = "endpoint" ) func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -830,41 +807,15 @@ func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error return err } switch *c { - case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleContainer, KubernetesRoleService, KubernetesRoleEndpoint, KubernetesRoleAPIServer: + case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleService, KubernetesRoleEndpoint: return nil default: return fmt.Errorf("Unknown Kubernetes SD role %q", *c) } } -// UnmarshalYAML implements the yaml.Unmarshaler interface. -func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { - *c = DefaultKubernetesSDConfig - type plain KubernetesSDConfig - err := unmarshal((*plain)(c)) - if err != nil { - return err - } - if err := checkOverflow(c.XXX, "kubernetes_sd_config"); err != nil { - return err - } - if c.Role == "" { - return fmt.Errorf("role missing (one of: container, pod, service, endpoint, node, apiserver)") - } - if len(c.APIServers) == 0 { - return fmt.Errorf("Kubernetes SD configuration requires at least one Kubernetes API server") - } - if len(c.BearerToken) > 0 && len(c.BearerTokenFile) > 0 { - return fmt.Errorf("at most one of bearer_token & bearer_token_file must be configured") - } - if c.BasicAuth != nil && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) { - return fmt.Errorf("at most one of basic_auth, bearer_token & bearer_token_file must be configured") - } - return nil -} - -// KubernetesV2SDConfig is the configuration for Kubernetes service discovery. -type KubernetesV2SDConfig struct { +// KubernetesSDConfig is the configuration for Kubernetes service discovery. +type KubernetesSDConfig struct { APIServer URL `yaml:"api_server"` Role KubernetesRole `yaml:"role"` BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` @@ -877,9 +828,9 @@ type KubernetesV2SDConfig struct { } // UnmarshalYAML implements the yaml.Unmarshaler interface. -func (c *KubernetesV2SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { - *c = KubernetesV2SDConfig{} - type plain KubernetesV2SDConfig +func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = KubernetesSDConfig{} + type plain KubernetesSDConfig err := unmarshal((*plain)(c)) if err != nil { return err @@ -888,7 +839,7 @@ func (c *KubernetesV2SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) return err } if c.Role == "" { - return fmt.Errorf("role missing (one of: container, pod, service, endpoint, node, apiserver)") + return fmt.Errorf("role missing (one of: pod, service, endpoint, node)") } if len(c.BearerToken) > 0 && len(c.BearerTokenFile) > 0 { return fmt.Errorf("at most one of bearer_token & bearer_token_file must be configured") diff --git a/retrieval/discovery/discovery.go b/retrieval/discovery/discovery.go index 6f22320b62..88796f6fd0 100644 --- a/retrieval/discovery/discovery.go +++ b/retrieval/discovery/discovery.go @@ -21,7 +21,6 @@ import ( "github.com/prometheus/prometheus/retrieval/discovery/consul" "github.com/prometheus/prometheus/retrieval/discovery/dns" "github.com/prometheus/prometheus/retrieval/discovery/kubernetes" - "github.com/prometheus/prometheus/retrieval/discovery/kubernetes_v2" "github.com/prometheus/prometheus/retrieval/discovery/marathon" ) @@ -31,20 +30,8 @@ func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) { } // NewKubernetesDiscovery creates a Kubernetes service discovery based on the passed-in configuration. -func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discovery, error) { - kd := &kubernetes.Discovery{ - Conf: conf, - } - err := kd.Initialize() - if err != nil { - return nil, err - } - return kd, nil -} - -// NewKubernetesV2Discovery creates a Kubernetes service discovery based on the passed-in configuration. -func NewKubernetesV2Discovery(conf *config.KubernetesV2SDConfig) (*kubernetesv2.Kubernetes, error) { - return kubernetesv2.New(log.Base(), conf) +func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Kubernetes, error) { + return kubernetes.New(log.Base(), conf) } // NewMarathon creates a new Marathon based discovery. diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go deleted file mode 100644 index b707f84821..0000000000 --- a/retrieval/discovery/kubernetes/discovery.go +++ /dev/null @@ -1,296 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kubernetes - -import ( - "fmt" - "io/ioutil" - "net" - "net/http" - "os" - "sync" - "time" - - "github.com/prometheus/common/log" - "github.com/prometheus/common/model" - "golang.org/x/net/context" - - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/util/httputil" -) - -const ( - // kubernetesMetaLabelPrefix is the meta prefix used for all meta labels. - // in this discovery. - metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_" - - // roleLabel is the name for the label containing a target's role. - roleLabel = metaLabelPrefix + "role" - - sourcePodPrefix = "pods" - // podsTargetGroupNAme is the name given to the target group for pods - podsTargetGroupName = "pods" - // podNamespaceLabel is the name for the label containing a target pod's namespace - podNamespaceLabel = metaLabelPrefix + "pod_namespace" - // podNameLabel is the name for the label containing a target pod's name - podNameLabel = metaLabelPrefix + "pod_name" - // podAddressLabel is the name for the label containing a target pod's IP address (the PodIP) - podAddressLabel = metaLabelPrefix + "pod_address" - // podContainerNameLabel is the name for the label containing a target's container name - podContainerNameLabel = metaLabelPrefix + "pod_container_name" - // podContainerPortNameLabel is the name for the label containing the name of the port selected for a target - podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name" - // PodContainerPortListLabel is the name for the label containing a list of all TCP ports on the target container - podContainerPortListLabel = metaLabelPrefix + "pod_container_port_list" - // PodContainerPortMapPrefix is the prefix used to create the names of labels that associate container port names to port values - // Such labels will be named (podContainerPortMapPrefix)_(PortName) = (ContainerPort) - podContainerPortMapPrefix = metaLabelPrefix + "pod_container_port_map_" - // podReadyLabel is the name for the label containing the 'Ready' status (true/false/unknown) for a target - podReadyLabel = metaLabelPrefix + "pod_ready" - // podLabelPrefix is the prefix for prom label names corresponding to k8s labels for a target pod - podLabelPrefix = metaLabelPrefix + "pod_label_" - // podAnnotationPrefix is the prefix for prom label names corresponding to k8s annotations for a target pod - podAnnotationPrefix = metaLabelPrefix + "pod_annotation_" - // podNodeLabel is the name for the label containing the name of the node that a pod is scheduled on to - podNodeNameLabel = metaLabelPrefix + "pod_node_name" - // podHostIPLabel is the name for the label containing the IP of the node that a pod is scheduled on to - podHostIPLabel = metaLabelPrefix + "pod_host_ip" - - sourceServicePrefix = "services" - // serviceNamespaceLabel is the name for the label containing a target's service namespace. - serviceNamespaceLabel = metaLabelPrefix + "service_namespace" - // serviceNameLabel is the name for the label containing a target's service name. - serviceNameLabel = metaLabelPrefix + "service_name" - // serviceLabelPrefix is the prefix for the service labels. - serviceLabelPrefix = metaLabelPrefix + "service_label_" - // serviceAnnotationPrefix is the prefix for the service annotations. - serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_" - - // nodesTargetGroupName is the name given to the target group for nodes. - nodesTargetGroupName = "nodes" - // nodeLabelPrefix is the prefix for the node labels. - nodeLabelPrefix = metaLabelPrefix + "node_label_" - // nodeAddressPrefix is the prefix for the node addresses. - nodeAddressPrefix = metaLabelPrefix + "node_address_" - // nodePortLabel is the name of the label for the node port. - nodePortLabel = metaLabelPrefix + "node_port" - - // apiServersTargetGroupName is the name given to the target group for API servers. - apiServersTargetGroupName = "apiServers" - - serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token" - serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" - - apiVersion = "v1" - apiPrefix = "/api/" + apiVersion - nodesURL = apiPrefix + "/nodes" - podsURL = apiPrefix + "/pods" - servicesURL = apiPrefix + "/services" - endpointsURL = apiPrefix + "/endpoints" - serviceEndpointsURL = apiPrefix + "/namespaces/%s/endpoints/%s" -) - -// Discovery implements a TargetProvider for Kubernetes services. -type Discovery struct { - client *http.Client - Conf *config.KubernetesSDConfig - - apiServers []config.URL - apiServersMu sync.RWMutex -} - -// Initialize sets up the discovery for usage. -func (kd *Discovery) Initialize() error { - client, err := newKubernetesHTTPClient(kd.Conf) - - if err != nil { - return err - } - - kd.apiServers = kd.Conf.APIServers - kd.client = client - - return nil -} - -// Run implements the TargetProvider interface. -func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - log.Debugf("Start Kubernetes service discovery") - defer close(ch) - - switch kd.Conf.Role { - case config.KubernetesRolePod, config.KubernetesRoleContainer: - pd := &podDiscovery{ - retryInterval: time.Duration(kd.Conf.RetryInterval), - kd: kd, - } - pd.run(ctx, ch) - case config.KubernetesRoleNode: - nd := &nodeDiscovery{ - retryInterval: time.Duration(kd.Conf.RetryInterval), - kd: kd, - } - nd.run(ctx, ch) - case config.KubernetesRoleService, config.KubernetesRoleEndpoint: - sd := &serviceDiscovery{ - retryInterval: time.Duration(kd.Conf.RetryInterval), - kd: kd, - } - sd.run(ctx, ch) - case config.KubernetesRoleAPIServer: - select { - case ch <- []*config.TargetGroup{kd.updateAPIServersTargetGroup()}: - case <-ctx.Done(): - return - } - default: - log.Errorf("unknown Kubernetes discovery kind %q", kd.Conf.Role) - return - } -} - -func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) { - req, err := http.NewRequest("GET", path, nil) - if err != nil { - return nil, err - } - return kd.queryAPIServerReq(req) -} - -func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error) { - // Lock in case we need to rotate API servers to request. - kd.apiServersMu.Lock() - defer kd.apiServersMu.Unlock() - var lastErr error - for i := 0; i < len(kd.apiServers); i++ { - cloneReq := *req - cloneReq.URL.Host = kd.apiServers[0].Host - cloneReq.URL.Scheme = kd.apiServers[0].Scheme - res, err := kd.client.Do(&cloneReq) - if err == nil { - return res, nil - } - lastErr = err - kd.rotateAPIServers() - } - return nil, fmt.Errorf("unable to query any API servers: %v", lastErr) -} - -func (kd *Discovery) rotateAPIServers() { - if len(kd.apiServers) > 1 { - kd.apiServers = append(kd.apiServers[1:], kd.apiServers[0]) - } -} - -func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup { - tg := &config.TargetGroup{ - Source: apiServersTargetGroupName, - Labels: model.LabelSet{ - roleLabel: model.LabelValue("apiserver"), - }, - } - - for _, apiServer := range kd.apiServers { - apiServerAddress := apiServer.Host - _, _, err := net.SplitHostPort(apiServerAddress) - // If error then no port is specified - use default for scheme. - if err != nil { - switch apiServer.Scheme { - case "http": - apiServerAddress = net.JoinHostPort(apiServerAddress, "80") - case "https": - apiServerAddress = net.JoinHostPort(apiServerAddress, "443") - } - } - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(apiServerAddress), - model.SchemeLabel: model.LabelValue(apiServer.Scheme), - } - tg.Targets = append(tg.Targets, t) - } - - return tg -} - -func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) { - bearerTokenFile := conf.BearerTokenFile - tlsConfig := conf.TLSConfig - if conf.InCluster { - if len(bearerTokenFile) == 0 { - bearerTokenFile = serviceAccountToken - } - if len(tlsConfig.CAFile) == 0 { - // With recent versions, the CA certificate is mounted as a secret - // but we need to handle older versions too. In this case, don't - // set the CAFile & the configuration will have to use InsecureSkipVerify. - if _, err := os.Stat(serviceAccountCACert); err == nil { - tlsConfig.CAFile = serviceAccountCACert - } - } - } - tls, err := httputil.NewTLSConfig(tlsConfig) - if err != nil { - return nil, err - } - - var rt http.RoundTripper = &http.Transport{ - Dial: func(netw, addr string) (c net.Conn, err error) { - c, err = net.DialTimeout(netw, addr, time.Duration(conf.RequestTimeout)) - return - }, - TLSClientConfig: tls, - } - - // If a bearer token is provided, create a round tripper that will set the - // Authorization header correctly on each request. - bearerToken := conf.BearerToken - if len(bearerToken) == 0 && len(bearerTokenFile) > 0 { - b, err := ioutil.ReadFile(bearerTokenFile) - if err != nil { - return nil, fmt.Errorf("unable to read bearer token file %s: %s", bearerTokenFile, err) - } - bearerToken = string(b) - } - if len(bearerToken) > 0 { - rt = httputil.NewBearerAuthRoundTripper(bearerToken, rt) - } - - if conf.BasicAuth != nil { - rt = httputil.NewBasicAuthRoundTripper(conf.BasicAuth.Username, conf.BasicAuth.Password, rt) - } - - return &http.Client{ - Transport: rt, - }, nil -} - -// Until loops until stop channel is closed, running f every period. -// f may not be invoked if stop channel is already closed. -func until(f func(), period time.Duration, stopCh <-chan struct{}) { - select { - case <-stopCh: - return - default: - f() - } - for { - select { - case <-stopCh: - return - case <-time.After(period): - f() - } - } -} diff --git a/retrieval/discovery/kubernetes/discovery_test.go b/retrieval/discovery/kubernetes/discovery_test.go deleted file mode 100644 index 89ea76d69c..0000000000 --- a/retrieval/discovery/kubernetes/discovery_test.go +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kubernetes - -import ( - "flag" - "math/rand" - "os" - "testing" - - _ "github.com/prometheus/common/log" - "github.com/prometheus/common/model" -) - -func TestMain(m *testing.M) { - flag.Parse() - os.Exit(m.Run()) -} - -var portsA = []ContainerPort{ - { - Name: "http", - ContainerPort: 80, - Protocol: "TCP", - }, -} - -var portsB = []ContainerPort{ - { - Name: "https", - ContainerPort: 443, - Protocol: "TCP", - }, -} - -var portsNoTcp = []ContainerPort{ - { - Name: "dns", - ContainerPort: 53, - Protocol: "UDP", - }, -} - -var portsMultiA = []ContainerPort{ - { - Name: "http", - ContainerPort: 80, - Protocol: "TCP", - }, - { - Name: "ssh", - ContainerPort: 22, - Protocol: "TCP", - }, -} - -var portsMultiB = []ContainerPort{ - { - Name: "http", - ContainerPort: 80, - Protocol: "TCP", - }, - { - Name: "https", - ContainerPort: 443, - Protocol: "TCP", - }, -} - -func container(name string, ports []ContainerPort) Container { - p := make([]ContainerPort, len(ports)) - copy(p, ports) - - // Shuffle order of ports to ensure code enforces determinism - for i := range p { - j := rand.Intn(i + 1) - p[i], p[j] = p[j], p[i] - } - - return Container{ - Name: name, - Ports: p, - } -} - -func pod(name string, containers []Container) *Pod { - c := make([]Container, len(containers)) - copy(c, containers) - - // Shuffle order of containers to ensure code enforces determinism - for i := range c { - j := rand.Intn(i + 1) - c[i], c[j] = c[j], c[i] - } - - return &Pod{ - ObjectMeta: ObjectMeta{ - Name: name, - }, - PodStatus: PodStatus{ - PodIP: "1.1.1.1", - Phase: "Running", - Conditions: []PodCondition{ - { - Type: "Ready", - Status: "True", - }, - }, - HostIP: "2.2.2.2", - }, - PodSpec: PodSpec{ - Containers: c, - NodeName: "test-node", - }, - } -} - -func TestUpdatePodTargets(t *testing.T) { - var result []model.LabelSet - - // Multiple iterations help ensure that we'll see different permutations via the various randomizations that occur - for i := 0; i < 100; i++ { - // Return no targets for a pod that isn't "Running" - result = updatePodTargets(&Pod{PodStatus: PodStatus{PodIP: "1.1.1.1"}}, true) - if len(result) > 0 { - t.Fatalf("expected 0 targets, received %d", len(result)) - } - - // Return no targets for a pod with no IP - result = updatePodTargets(&Pod{PodStatus: PodStatus{Phase: "Running"}}, true) - if len(result) > 0 { - t.Fatalf("expected 0 targets, received %d", len(result)) - } - - // A pod with no containers (?!) should not produce any targets - result = updatePodTargets(pod("empty", []Container{}), true) - if len(result) > 0 { - t.Fatalf("expected 0 targets, received %d", len(result)) - } - - // Return no targets for a pod that has no HostIP - result = updatePodTargets(&Pod{PodStatus: PodStatus{PodIP: "1.1.1.1", Phase: "Running"}}, true) - if len(result) > 0 { - t.Fatalf("expected 0 targets, received %d", len(result)) - } - - // A pod with all valid containers should return one target per container with allContainers=true - result = updatePodTargets(pod("easy", []Container{container("a", portsA), container("b", portsB)}), true) - if len(result) != 2 { - t.Fatalf("expected 2 targets, received %d", len(result)) - } - if result[0][podReadyLabel] != "true" { - t.Fatalf("expected result[0] podReadyLabel 'true', received '%s'", result[0][podReadyLabel]) - } - if _, ok := result[0][podContainerPortMapPrefix+"http"]; !ok { - t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was missing") - } - if result[0][podContainerPortMapPrefix+"http"] != "80" { - t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was %s", result[0][podContainerPortMapPrefix+"http"]) - } - if _, ok := result[1][podContainerPortMapPrefix+"https"]; !ok { - t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was missing") - } - if result[1][podContainerPortMapPrefix+"https"] != "443" { - t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was %s", result[1][podContainerPortMapPrefix+"https"]) - } - if result[0][podNodeNameLabel] != "test-node" { - t.Fatalf("expected result[0] podNodeNameLabel 'test-node', received '%s'", result[0][podNodeNameLabel]) - } - if result[0][podHostIPLabel] != "2.2.2.2" { - t.Fatalf("expected result[0] podHostIPLabel '2.2.2.2', received '%s'", result[0][podHostIPLabel]) - } - - // A pod with all valid containers should return one target with allContainers=false, and it should be the alphabetically first container - result = updatePodTargets(pod("easy", []Container{container("a", portsA), container("b", portsB)}), false) - if len(result) != 1 { - t.Fatalf("expected 1 targets, received %d", len(result)) - } - if _, ok := result[0][podContainerNameLabel]; !ok { - t.Fatalf("expected result[0][podContainerNameLabel] to be 'a', but was missing") - } - if result[0][podContainerNameLabel] != "a" { - t.Fatalf("expected result[0][podContainerNameLabel] to be 'a', but was '%s'", result[0][podContainerNameLabel]) - } - - // A pod with some non-targetable containers should return one target per targetable container with allContainers=true - result = updatePodTargets(pod("mixed", []Container{container("a", portsA), container("no-tcp", portsNoTcp), container("b", portsB)}), true) - if len(result) != 2 { - t.Fatalf("expected 2 targets, received %d", len(result)) - } - - // A pod with a container with multiple ports should return the numerically smallest port - result = updatePodTargets(pod("hard", []Container{container("multiA", portsMultiA), container("multiB", portsMultiB)}), true) - if len(result) != 2 { - t.Fatalf("expected 2 targets, received %d", len(result)) - } - if result[0][model.AddressLabel] != "1.1.1.1:22" { - t.Fatalf("expected result[0] address to be 1.1.1.1:22, received %s", result[0][model.AddressLabel]) - } - if result[0][podContainerPortListLabel] != ",ssh=22,http=80," { - t.Fatalf("expected result[0] podContainerPortListLabel to be ',ssh=22,http=80,', received '%s'", result[0][podContainerPortListLabel]) - } - if result[1][model.AddressLabel] != "1.1.1.1:80" { - t.Fatalf("expected result[1] address to be 1.1.1.1:80, received %s", result[1][model.AddressLabel]) - } - if result[1][podContainerPortListLabel] != ",http=80,https=443," { - t.Fatalf("expected result[1] podContainerPortListLabel to be ',http=80,https=443,', received '%s'", result[1][podContainerPortListLabel]) - } - } -} diff --git a/retrieval/discovery/kubernetes_v2/endpoint.go b/retrieval/discovery/kubernetes/endpoint.go similarity index 99% rename from retrieval/discovery/kubernetes_v2/endpoint.go rename to retrieval/discovery/kubernetes/endpoint.go index 7434bde78e..5756c68d7c 100644 --- a/retrieval/discovery/kubernetes_v2/endpoint.go +++ b/retrieval/discovery/kubernetes/endpoint.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetesv2 +package kubernetes import ( "fmt" diff --git a/retrieval/discovery/kubernetes_v2/kubernetes.go b/retrieval/discovery/kubernetes/kubernetes.go similarity index 97% rename from retrieval/discovery/kubernetes_v2/kubernetes.go rename to retrieval/discovery/kubernetes/kubernetes.go index f756802b4c..2a1f4b6872 100644 --- a/retrieval/discovery/kubernetes_v2/kubernetes.go +++ b/retrieval/discovery/kubernetes/kubernetes.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetesv2 +package kubernetes import ( "io/ioutil" @@ -52,12 +52,12 @@ func init() { } // New creates a new Kubernetes discovery for the given role. -func New(l log.Logger, conf *config.KubernetesV2SDConfig) (*Kubernetes, error) { +func New(l log.Logger, conf *config.KubernetesSDConfig) (*Kubernetes, error) { var ( kcfg *rest.Config err error ) - if conf.APIServer.String() == "" { + if conf.APIServer.URL == nil { kcfg, err = rest.InClusterConfig() if err != nil { return nil, err diff --git a/retrieval/discovery/kubernetes/node.go b/retrieval/discovery/kubernetes/node.go index 26b173b656..8fb0f20a2e 100644 --- a/retrieval/discovery/kubernetes/node.go +++ b/retrieval/discovery/kubernetes/node.go @@ -14,229 +14,148 @@ package kubernetes import ( - "encoding/json" "fmt" - "io/ioutil" "net" - "net/http" "strconv" - "sync" - "time" "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/strutil" "golang.org/x/net/context" + "k8s.io/client-go/1.5/pkg/api" + apiv1 "k8s.io/client-go/1.5/pkg/api/v1" + "k8s.io/client-go/1.5/tools/cache" ) -type nodeDiscovery struct { - mtx sync.RWMutex - nodes map[string]*Node - retryInterval time.Duration - kd *Discovery +// Node discovers Kubernetes nodes. +type Node struct { + logger log.Logger + informer cache.SharedInformer + store cache.Store } -func (d *nodeDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { +// NewNode returns a new node discovery. +func NewNode(l log.Logger, inf cache.SharedInformer) *Node { + return &Node{logger: l, informer: inf, store: inf.GetStore()} +} + +// Run implements the TargetProvider interface. +func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // Send full initial set of pod targets. + var initial []*config.TargetGroup + for _, o := range n.store.List() { + tg := n.buildNode(o.(*apiv1.Node)) + initial = append(initial, tg) + } select { - case ch <- []*config.TargetGroup{d.updateNodesTargetGroup()}: case <-ctx.Done(): return + case ch <- initial: } - update := make(chan *nodeEvent, 10) - go d.watchNodes(update, ctx.Done(), d.retryInterval) - - for { - tgs := []*config.TargetGroup{} + // Send target groups for service updates. + send := func(tg *config.TargetGroup) { select { case <-ctx.Done(): - return - case e := <-update: - log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", e.EventType, e.Node.ObjectMeta.Name) - d.updateNode(e.Node, e.EventType) - tgs = append(tgs, d.updateNodesTargetGroup()) - } - if tgs == nil { - continue - } - - for _, tg := range tgs { - select { - case ch <- []*config.TargetGroup{tg}: - case <-ctx.Done(): - return - } + case ch <- []*config.TargetGroup{tg}: } } + n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + send(n.buildNode(o.(*apiv1.Node))) + }, + DeleteFunc: func(o interface{}) { + send(&config.TargetGroup{Source: nodeSource(o.(*apiv1.Node))}) + }, + UpdateFunc: func(_, o interface{}) { + send(n.buildNode(o.(*apiv1.Node))) + }, + }) + + // Block until the target provider is explicitly canceled. + <-ctx.Done() } -func (d *nodeDiscovery) updateNodesTargetGroup() *config.TargetGroup { - d.mtx.RLock() - defer d.mtx.RUnlock() +func nodeSource(n *apiv1.Node) string { + return "node/" + n.Namespace + "/" + n.Name +} +const ( + nodeNameLabel = metaLabelPrefix + "node_name" + nodeLabelPrefix = metaLabelPrefix + "node_label_" + nodeAnnotationPrefix = metaLabelPrefix + "node_annotation_" + nodeAddressPrefix = metaLabelPrefix + "node_address_" +) + +func nodeLabels(n *apiv1.Node) model.LabelSet { + ls := make(model.LabelSet, len(n.Labels)+len(n.Annotations)+2) + + ls[nodeNameLabel] = lv(n.Name) + + for k, v := range n.Labels { + ln := strutil.SanitizeLabelName(nodeLabelPrefix + k) + ls[model.LabelName(ln)] = lv(v) + } + + for k, v := range n.Annotations { + ln := strutil.SanitizeLabelName(nodeAnnotationPrefix + k) + ls[model.LabelName(ln)] = lv(v) + } + return ls +} + +func (n *Node) buildNode(node *apiv1.Node) *config.TargetGroup { tg := &config.TargetGroup{ - Source: nodesTargetGroupName, - Labels: model.LabelSet{ - roleLabel: model.LabelValue("node"), - }, + Source: nodeSource(node), + } + tg.Labels = nodeLabels(node) + + addr, addrMap, err := nodeAddress(node) + if err != nil { + n.logger.With("err", err).Debugf("No node address found") + return nil + } + addr = net.JoinHostPort(addr, strconv.FormatInt(int64(node.Status.DaemonEndpoints.KubeletEndpoint.Port), 10)) + + t := model.LabelSet{ + model.AddressLabel: lv(addr), + model.InstanceLabel: lv(node.Name), } - // Now let's loop through the nodes & add them to the target group with appropriate labels. - for nodeName, node := range d.nodes { - defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node) - if err != nil { - log.Debugf("Skipping node %s: %s", node.Name, err) - continue - } - - kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) - - address := net.JoinHostPort(defaultNodeAddress.String(), fmt.Sprintf("%d", kubeletPort)) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(address), - model.InstanceLabel: model.LabelValue(nodeName), - } - - for addrType, ip := range nodeAddressMap { - labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType)) - t[model.LabelName(labelName)] = model.LabelValue(ip[0].String()) - } - - t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort)) - - for k, v := range node.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - tg.Targets = append(tg.Targets, t) + for ty, a := range addrMap { + ln := strutil.SanitizeLabelName(nodeAddressPrefix + string(ty)) + t[model.LabelName(ln)] = lv(a[0]) } + tg.Targets = append(tg.Targets, t) return tg } -// watchNodes watches nodes as they come & go. -func (d *nodeDiscovery) watchNodes(events chan *nodeEvent, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - nodes, resourceVersion, err := d.getNodes() - if err != nil { - log.Errorf("Cannot initialize nodes collection: %s", err) - return - } - - // Reset the known nodes. - d.mtx.Lock() - d.nodes = map[string]*Node{} - d.mtx.Unlock() - - for _, node := range nodes { - events <- &nodeEvent{Added, node} - } - - req, err := http.NewRequest("GET", nodesURL, nil) - if err != nil { - log.Errorf("Cannot create nodes request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - res, err := d.kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch nodes: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch nodes: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event nodeEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch nodes unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } - }, retryInterval, done) -} - -func (d *nodeDiscovery) updateNode(node *Node, eventType EventType) { - d.mtx.Lock() - defer d.mtx.Unlock() - - updatedNodeName := node.ObjectMeta.Name - switch eventType { - case Deleted: - // Deleted - remove from nodes map. - delete(d.nodes, updatedNodeName) - case Added, Modified: - // Added/Modified - update the node in the nodes map. - d.nodes[updatedNodeName] = node - } -} - -func (d *nodeDiscovery) getNodes() (map[string]*Node, string, error) { - res, err := d.kd.queryAPIServerPath(nodesURL) - if err != nil { - // If we can't list nodes then we can't watch them. Assume this is a misconfiguration - // & return error. - return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status) - } - - var nodes NodeList - if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body)) - } - - nodeMap := map[string]*Node{} - for idx, node := range nodes.Items { - nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx] - } - - return nodeMap, nodes.ResourceVersion, nil -} - // nodeAddresses returns the provided node's address, based on the priority: // 1. NodeInternalIP // 2. NodeExternalIP // 3. NodeLegacyHostIP +// 3. NodeHostName // -// Copied from k8s.io/kubernetes/pkg/util/node/node.go -func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) { - addresses := node.Status.Addresses - addressMap := map[NodeAddressType][]net.IP{} - for _, addr := range addresses { - ip := net.ParseIP(addr.Address) - // All addresses should be valid IPs. - if ip == nil { - continue - } - addressMap[addr.Type] = append(addressMap[addr.Type], ip) +// Derived from k8s.io/kubernetes/pkg/util/node/node.go +func nodeAddress(node *apiv1.Node) (string, map[apiv1.NodeAddressType][]string, error) { + m := map[apiv1.NodeAddressType][]string{} + for _, a := range node.Status.Addresses { + m[a.Type] = append(m[a.Type], a.Address) } - if addresses, ok := addressMap[NodeInternalIP]; ok { - return addresses[0], addressMap, nil + + if addresses, ok := m[apiv1.NodeInternalIP]; ok { + return addresses[0], m, nil } - if addresses, ok := addressMap[NodeExternalIP]; ok { - return addresses[0], addressMap, nil + if addresses, ok := m[apiv1.NodeExternalIP]; ok { + return addresses[0], m, nil } - if addresses, ok := addressMap[NodeLegacyHostIP]; ok { - return addresses[0], addressMap, nil + if addresses, ok := m[apiv1.NodeAddressType(api.NodeLegacyHostIP)]; ok { + return addresses[0], m, nil } - return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) + if addresses, ok := m[apiv1.NodeHostName]; ok { + return addresses[0], m, nil + } + return "", m, fmt.Errorf("host address unknown") } diff --git a/retrieval/discovery/kubernetes/pod.go b/retrieval/discovery/kubernetes/pod.go index 082553726a..234ea9a4c2 100644 --- a/retrieval/discovery/kubernetes/pod.go +++ b/retrieval/discovery/kubernetes/pod.go @@ -14,337 +14,164 @@ package kubernetes import ( - "bytes" - "encoding/json" "fmt" - "io/ioutil" "net" - "net/http" - "sort" "strconv" "strings" - "sync" - "time" "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/strutil" "golang.org/x/net/context" + "k8s.io/client-go/1.5/pkg/api" + apiv1 "k8s.io/client-go/1.5/pkg/api/v1" + "k8s.io/client-go/1.5/tools/cache" ) -type podDiscovery struct { - mtx sync.RWMutex - pods map[string]map[string]*Pod - retryInterval time.Duration - kd *Discovery +// Pod discovers new pod targets. +type Pod struct { + informer cache.SharedInformer + store cache.Store + logger log.Logger } -func (d *podDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { - pods, _, err := d.getPods() - if err != nil { - log.Errorf("Cannot initialize pods collection: %s", err) - return +// NewPods creates a new pod discovery. +func NewPods(l log.Logger, pods cache.SharedInformer) *Pod { + return &Pod{ + informer: pods, + store: pods.GetStore(), + logger: l, } - d.pods = pods +} - initial := []*config.TargetGroup{} - switch d.kd.Conf.Role { - case config.KubernetesRolePod: - initial = append(initial, d.updatePodsTargetGroup()) - case config.KubernetesRoleContainer: - for _, ns := range d.pods { - for _, pod := range ns { - initial = append(initial, d.updateContainerTargetGroup(pod)) - } - } +// Run implements the TargetProvider interface. +func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // Send full initial set of pod targets. + var initial []*config.TargetGroup + for _, o := range p.store.List() { + tg := p.buildPod(o.(*apiv1.Pod)) + initial = append(initial, tg) + + p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("initial pod") } - select { - case ch <- initial: case <-ctx.Done(): return + case ch <- initial: } - update := make(chan *podEvent, 10) - go d.watchPods(update, ctx.Done(), d.retryInterval) - - for { - tgs := []*config.TargetGroup{} + // Send target groups for pod updates. + send := func(tg *config.TargetGroup) { + p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("pod update") select { case <-ctx.Done(): - return - case e := <-update: - log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", e.EventType, e.Pod.ObjectMeta.Name) - d.updatePod(e.Pod, e.EventType) - - switch d.kd.Conf.Role { - case config.KubernetesRoleContainer: - // Update the per-pod target group - tgs = append(tgs, d.updateContainerTargetGroup(e.Pod)) - case config.KubernetesRolePod: - // Update the all pods target group - tgs = append(tgs, d.updatePodsTargetGroup()) - } - } - if tgs == nil { - continue - } - - for _, tg := range tgs { - select { - case ch <- []*config.TargetGroup{tg}: - case <-ctx.Done(): - return - } + case ch <- []*config.TargetGroup{tg}: } } + p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + send(p.buildPod(o.(*apiv1.Pod))) + }, + DeleteFunc: func(o interface{}) { + send(&config.TargetGroup{Source: podSource(o.(*apiv1.Pod))}) + }, + UpdateFunc: func(_, o interface{}) { + send(p.buildPod(o.(*apiv1.Pod))) + }, + }) + + // Block until the target provider is explicitly canceled. + <-ctx.Done() } -func (d *podDiscovery) getPods() (map[string]map[string]*Pod, string, error) { - res, err := d.kd.queryAPIServerPath(podsURL) - if err != nil { - return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status) +const ( + podNameLabel = metaLabelPrefix + "pod_name" + podIPLabel = metaLabelPrefix + "pod_ip" + podContainerNameLabel = metaLabelPrefix + "pod_container_name" + podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name" + podContainerPortNumberLabel = metaLabelPrefix + "pod_container_port_number" + podContainerPortProtocolLabel = metaLabelPrefix + "pod_container_port_protocol" + podReadyLabel = metaLabelPrefix + "pod_ready" + podLabelPrefix = metaLabelPrefix + "pod_label_" + podAnnotationPrefix = metaLabelPrefix + "pod_annotation_" + podNodeNameLabel = metaLabelPrefix + "pod_node_name" + podHostIPLabel = metaLabelPrefix + "pod_host_ip" +) + +func podLabels(pod *apiv1.Pod) model.LabelSet { + ls := model.LabelSet{ + podNameLabel: lv(pod.ObjectMeta.Name), + podIPLabel: lv(pod.Status.PodIP), + podReadyLabel: podReady(pod), + podNodeNameLabel: lv(pod.Spec.NodeName), + podHostIPLabel: lv(pod.Status.HostIP), } - var pods PodList - if err := json.NewDecoder(res.Body).Decode(&pods); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body)) + for k, v := range pod.Labels { + ln := strutil.SanitizeLabelName(serviceLabelPrefix + k) + ls[model.LabelName(ln)] = lv(v) } - podMap := map[string]map[string]*Pod{} - for idx, pod := range pods.Items { - if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok { - podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{} - } - log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) - podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx] + for k, v := range pod.Annotations { + ln := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) + ls[model.LabelName(ln)] = lv(v) } - return podMap, pods.ResourceVersion, nil + return ls } -func (d *podDiscovery) watchPods(events chan *podEvent, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - pods, resourceVersion, err := d.getPods() - if err != nil { - log.Errorf("Cannot initialize pods collection: %s", err) - return - } - d.mtx.Lock() - d.pods = pods - d.mtx.Unlock() - - req, err := http.NewRequest("GET", podsURL, nil) - if err != nil { - log.Errorf("Cannot create pods request: %s", err) - return - } - - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - res, err := d.kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch pods: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch pods: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event podEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch pods unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } - }, retryInterval, done) -} - -func (d *podDiscovery) updatePod(pod *Pod, eventType EventType) { - d.mtx.Lock() - defer d.mtx.Unlock() - - switch eventType { - case Deleted: - if _, ok := d.pods[pod.ObjectMeta.Namespace]; ok { - delete(d.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name) - if len(d.pods[pod.ObjectMeta.Namespace]) == 0 { - delete(d.pods, pod.ObjectMeta.Namespace) - } - } - case Added, Modified: - if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok { - d.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{} - } - d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod +func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup { + // During startup the pod may not have an IP yet. This does not even allow + // for an up metric, so we skip the target. + if len(pod.Status.PodIP) == 0 { + return nil } -} - -func (d *podDiscovery) updateContainerTargetGroup(pod *Pod) *config.TargetGroup { - d.mtx.RLock() - defer d.mtx.RUnlock() - tg := &config.TargetGroup{ Source: podSource(pod), } + tg.Labels = podLabels(pod) + tg.Labels[namespaceLabel] = lv(pod.Namespace) - // If this pod doesn't exist, return an empty target group - if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok { - return tg - } - if _, ok := d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok { - return tg - } - - tg.Labels = model.LabelSet{ - roleLabel: model.LabelValue("container"), - } - tg.Targets = updatePodTargets(pod, true) - - return tg -} - -func (d *podDiscovery) updatePodsTargetGroup() *config.TargetGroup { - tg := &config.TargetGroup{ - Source: podsTargetGroupName, - Labels: model.LabelSet{ - roleLabel: model.LabelValue("pod"), - }, - } - - for _, namespace := range d.pods { - for _, pod := range namespace { - tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...) - } - } - - return tg -} - -func podSource(pod *Pod) string { - return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name -} - -func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { - var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers)) - if pod.PodStatus.PodIP == "" { - log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name) - return targets - } - - if pod.PodStatus.Phase != "Running" { - log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name) - return targets - } - - // Should never hit this (running pods should always have this set), but better to be defensive. - if pod.PodStatus.HostIP == "" { - log.Debugf("skipping pod %s -- PodStatus.HostIP is empty", pod.ObjectMeta.Name) - return targets - } - - ready := "unknown" - for _, cond := range pod.PodStatus.Conditions { - if strings.ToLower(cond.Type) == "ready" { - ready = strings.ToLower(cond.Status) - } - } - - sort.Sort(ByContainerName(pod.PodSpec.Containers)) - - for _, container := range pod.PodSpec.Containers { - // Collect a list of TCP ports - // Sort by port number, ascending - // Product a target pointed at the first port - // Include a label containing all ports (portName=port,PortName=port,...,) - var tcpPorts []ContainerPort - var portLabel *bytes.Buffer = bytes.NewBufferString(",") - - for _, port := range container.Ports { - if port.Protocol == "TCP" { - tcpPorts = append(tcpPorts, port) - } - } - - if len(tcpPorts) == 0 { - log.Debugf("skipping container %s with no TCP ports", container.Name) + for _, c := range pod.Spec.Containers { + // If no ports are defined for the container, create an anonymous + // target per container. + if len(c.Ports) == 0 { + // We don't have a port so we just set the address label to the pod IP. + // The user has to add a port manually. + tg.Targets = append(tg.Targets, model.LabelSet{ + model.AddressLabel: lv(pod.Status.PodIP), + podContainerNameLabel: lv(c.Name), + }) continue } + // Otherwise create one target for each container/port combination. + for _, port := range c.Ports { + ports := strconv.FormatInt(int64(port.ContainerPort), 10) + addr := net.JoinHostPort(pod.Status.PodIP, ports) - sort.Sort(ByContainerPort(tcpPorts)) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), - podNameLabel: model.LabelValue(pod.ObjectMeta.Name), - podAddressLabel: model.LabelValue(pod.PodStatus.PodIP), - podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), - podContainerNameLabel: model.LabelValue(container.Name), - podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name), - podReadyLabel: model.LabelValue(ready), - podNodeNameLabel: model.LabelValue(pod.PodSpec.NodeName), - podHostIPLabel: model.LabelValue(pod.PodStatus.HostIP), - } - - for _, port := range tcpPorts { - portLabel.WriteString(port.Name) - portLabel.WriteString("=") - portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) - portLabel.WriteString(",") - t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) - } - - t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String()) - - for k, v := range pod.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(podLabelPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - - for k, v := range pod.ObjectMeta.Annotations { - labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - - targets = append(targets, t) - - if !allContainers { - break + tg.Targets = append(tg.Targets, model.LabelSet{ + model.AddressLabel: lv(addr), + podContainerNameLabel: lv(c.Name), + podContainerPortNumberLabel: lv(ports), + podContainerPortNameLabel: lv(port.Name), + podContainerPortProtocolLabel: lv(string(port.Protocol)), + }) } } - if len(targets) == 0 { - log.Debugf("no targets for pod %s", pod.ObjectMeta.Name) - } - - return targets + return tg } -type ByContainerPort []ContainerPort +func podSource(pod *apiv1.Pod) string { + return "pod/" + pod.Namespace + "/" + pod.Name +} -func (a ByContainerPort) Len() int { return len(a) } -func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } -func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -type ByContainerName []Container - -func (a ByContainerName) Len() int { return len(a) } -func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name } -func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func podReady(pod *apiv1.Pod) model.LabelValue { + for _, cond := range pod.Status.Conditions { + if cond.Type == apiv1.PodReady { + return lv(strings.ToLower(string(cond.Status))) + } + } + return lv(strings.ToLower(string(api.ConditionUnknown))) +} diff --git a/retrieval/discovery/kubernetes/service.go b/retrieval/discovery/kubernetes/service.go index ac3bac33c8..bb6cb6631a 100644 --- a/retrieval/discovery/kubernetes/service.go +++ b/retrieval/discovery/kubernetes/service.go @@ -14,357 +14,112 @@ package kubernetes import ( - "encoding/json" - "fmt" - "io/ioutil" "net" - "net/http" - "sync" - "time" + "strconv" "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/strutil" "golang.org/x/net/context" + apiv1 "k8s.io/client-go/1.5/pkg/api/v1" + "k8s.io/client-go/1.5/tools/cache" ) -type serviceDiscovery struct { - mtx sync.RWMutex - services map[string]map[string]*Service - retryInterval time.Duration - kd *Discovery +// Service implements discovery of Kubernetes services. +type Service struct { + logger log.Logger + informer cache.SharedInformer + store cache.Store } -func (d *serviceDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { - update := make(chan interface{}, 10) - go d.startServiceWatch(update, ctx.Done(), d.retryInterval) +// NewService returns a new service discovery. +func NewService(l log.Logger, inf cache.SharedInformer) *Service { + return &Service{logger: l, informer: inf, store: inf.GetStore()} +} - for { - tgs := []*config.TargetGroup{} +// Run implements the TargetProvider interface. +func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // Send full initial set of pod targets. + var initial []*config.TargetGroup + for _, o := range s.store.List() { + tg := s.buildService(o.(*apiv1.Service)) + initial = append(initial, tg) + } + select { + case <-ctx.Done(): + return + case ch <- initial: + } + + // Send target groups for service updates. + send := func(tg *config.TargetGroup) { select { case <-ctx.Done(): - return - case event := <-update: - switch e := event.(type) { - case *endpointsEvent: - log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", e.EventType, e.Endpoints.ObjectMeta.Name) - tgs = append(tgs, d.updateServiceEndpoints(e.Endpoints, e.EventType)) - case *serviceEvent: - log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", e.EventType, e.Service.ObjectMeta.Name) - tgs = append(tgs, d.updateService(e.Service, e.EventType)) - } - } - if tgs == nil { - continue - } - - for _, tg := range tgs { - select { - case ch <- []*config.TargetGroup{tg}: - case <-ctx.Done(): - return - } + case ch <- []*config.TargetGroup{tg}: } } - -} - -func (d *serviceDiscovery) getServices() (map[string]map[string]*Service, string, error) { - res, err := d.kd.queryAPIServerPath(servicesURL) - if err != nil { - // If we can't list services then we can't watch them. Assume this is a misconfiguration - // & return error. - return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status) - } - var services ServiceList - if err := json.NewDecoder(res.Body).Decode(&services); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body)) - } - - serviceMap := map[string]map[string]*Service{} - for idx, service := range services.Items { - namespace, ok := serviceMap[service.ObjectMeta.Namespace] - if !ok { - namespace = map[string]*Service{} - serviceMap[service.ObjectMeta.Namespace] = namespace - } - namespace[service.ObjectMeta.Name] = &services.Items[idx] - } - - return serviceMap, services.ResourceVersion, nil -} - -// watchServices watches services as they come & go. -func (d *serviceDiscovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - // We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted - // in Kubernetes while we couldn't connect - small chance of this, but worth dealing with. - d.mtx.Lock() - existingServices := d.services - - // Reset the known services. - d.services = map[string]map[string]*Service{} - d.mtx.Unlock() - - services, resourceVersion, err := d.getServices() - if err != nil { - log.Errorf("Cannot initialize services collection: %s", err) - return - } - - // Now let's loop through the old services & see if they still exist in here - for oldNSName, oldNS := range existingServices { - if ns, ok := services[oldNSName]; !ok { - for _, service := range existingServices[oldNSName] { - events <- &serviceEvent{Deleted, service} - } - } else { - for oldServiceName, oldService := range oldNS { - if _, ok := ns[oldServiceName]; !ok { - events <- &serviceEvent{Deleted, oldService} - } - } - } - } - - // Discard the existing services map for GC. - existingServices = nil - - for _, ns := range services { - for _, service := range ns { - events <- &serviceEvent{Added, service} - } - } - - var wg sync.WaitGroup - wg.Add(2) - - go func() { - d.watchServices(resourceVersion, events, done) - wg.Done() - }() - go func() { - d.watchServiceEndpoints(resourceVersion, events, done) - wg.Done() - }() - - wg.Wait() - }, retryInterval, done) -} - -func (d *serviceDiscovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { - req, err := http.NewRequest("GET", servicesURL, nil) - if err != nil { - log.Errorf("Failed to create services request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - - res, err := d.kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch services: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch services: %d", res.StatusCode) - return - } - - dec := json.NewDecoder(res.Body) - - for { - var event serviceEvent - if err := dec.Decode(&event); err != nil { - log.Errorf("Watch services unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - return - } - } -} - -// watchServiceEndpoints watches service endpoints as they come & go. -func (d *serviceDiscovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { - req, err := http.NewRequest("GET", endpointsURL, nil) - if err != nil { - log.Errorf("Failed to create service endpoints request: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - - res, err := d.kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch service endpoints: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) - return - } - - dec := json.NewDecoder(res.Body) - - for { - var event endpointsEvent - if err := dec.Decode(&event); err != nil { - log.Errorf("Watch service endpoints unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } -} - -func (d *serviceDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup { - d.mtx.Lock() - defer d.mtx.Unlock() - - switch eventType { - case Deleted: - return d.deleteService(service) - case Added, Modified: - return d.addService(service) - } - return nil -} - -func (d *serviceDiscovery) deleteService(service *Service) *config.TargetGroup { - tg := &config.TargetGroup{Source: serviceSource(service)} - - delete(d.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name) - if len(d.services[service.ObjectMeta.Namespace]) == 0 { - delete(d.services, service.ObjectMeta.Namespace) - } - - return tg -} - -func (d *serviceDiscovery) addService(service *Service) *config.TargetGroup { - namespace, ok := d.services[service.ObjectMeta.Namespace] - if !ok { - namespace = map[string]*Service{} - d.services[service.ObjectMeta.Namespace] = namespace - } - - namespace[service.ObjectMeta.Name] = service - endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name) - - res, err := d.kd.queryAPIServerPath(endpointURL) - if err != nil { - log.Errorf("Error getting service endpoints: %s", err) - return nil - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to get service endpoints: %d", res.StatusCode) - return nil - } - - var eps Endpoints - if err := json.NewDecoder(res.Body).Decode(&eps); err != nil { - log.Errorf("Error getting service endpoints: %s", err) - return nil - } - - return d.updateServiceTargetGroup(service, &eps) -} - -func (d *serviceDiscovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup { - tg := &config.TargetGroup{ - Source: serviceSource(service), - Labels: model.LabelSet{ - serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace), - serviceNameLabel: model.LabelValue(service.ObjectMeta.Name), + s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + send(s.buildService(o.(*apiv1.Service))) }, + DeleteFunc: func(o interface{}) { + send(&config.TargetGroup{Source: serviceSource(o.(*apiv1.Service))}) + }, + UpdateFunc: func(_, o interface{}) { + send(s.buildService(o.(*apiv1.Service))) + }, + }) + + // Block until the target provider is explicitly canceled. + <-ctx.Done() +} + +func serviceSource(s *apiv1.Service) string { + return "svc/" + s.Namespace + "/" + s.Name +} + +const ( + serviceNameLabel = metaLabelPrefix + "service_name" + serviceLabelPrefix = metaLabelPrefix + "service_label_" + serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_" + servicePortNameLabel = metaLabelPrefix + "service_port_name" + servicePortProtocolLabel = metaLabelPrefix + "service_port_protocol" +) + +func serviceLabels(svc *apiv1.Service) model.LabelSet { + ls := make(model.LabelSet, len(svc.Labels)+len(svc.Annotations)+2) + + ls[serviceNameLabel] = lv(svc.Name) + + for k, v := range svc.Labels { + ln := strutil.SanitizeLabelName(serviceLabelPrefix + k) + ls[model.LabelName(ln)] = lv(v) } - for k, v := range service.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k) - tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) + for k, v := range svc.Annotations { + ln := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) + ls[model.LabelName(ln)] = lv(v) } + return ls +} - for k, v := range service.ObjectMeta.Annotations { - labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) - tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) +func (s *Service) buildService(svc *apiv1.Service) *config.TargetGroup { + tg := &config.TargetGroup{ + Source: serviceSource(svc), } + tg.Labels = serviceLabels(svc) + tg.Labels[namespaceLabel] = lv(svc.Namespace) - serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc" + for _, port := range svc.Spec.Ports { + addr := net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", strconv.FormatInt(int64(port.Port), 10)) - // Append the first TCP service port if one exists. - for _, port := range service.Spec.Ports { - if port.Protocol == ProtocolTCP { - serviceAddress += fmt.Sprintf(":%d", port.Port) - break - } - } - switch d.kd.Conf.Role { - case config.KubernetesRoleService: - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(serviceAddress), - roleLabel: model.LabelValue("service"), - } - tg.Targets = append(tg.Targets, t) - - case config.KubernetesRoleEndpoint: - // Now let's loop through the endpoints & add them to the target group - // with appropriate labels. - for _, ss := range eps.Subsets { - epPort := ss.Ports[0].Port - - for _, addr := range ss.Addresses { - ipAddr := addr.IP - if len(ipAddr) == net.IPv6len { - ipAddr = "[" + ipAddr + "]" - } - address := net.JoinHostPort(ipAddr, fmt.Sprintf("%d", epPort)) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(address), - roleLabel: model.LabelValue("endpoint"), - } - - tg.Targets = append(tg.Targets, t) - } - } + tg.Targets = append(tg.Targets, model.LabelSet{ + model.AddressLabel: lv(addr), + servicePortNameLabel: lv(port.Name), + servicePortProtocolLabel: lv(string(port.Protocol)), + }) } return tg } - -func (d *serviceDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup { - d.mtx.Lock() - defer d.mtx.Unlock() - - serviceNamespace := endpoints.ObjectMeta.Namespace - serviceName := endpoints.ObjectMeta.Name - - if service, ok := d.services[serviceNamespace][serviceName]; ok { - return d.updateServiceTargetGroup(service, endpoints) - } - return nil -} - -func serviceSource(service *Service) string { - return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name -} diff --git a/retrieval/discovery/kubernetes/types.go b/retrieval/discovery/kubernetes/types.go deleted file mode 100644 index e77076d974..0000000000 --- a/retrieval/discovery/kubernetes/types.go +++ /dev/null @@ -1,299 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kubernetes - -// EventType can legally only have the values defined as constants below. -type EventType string - -// Possible values for EventType. -const ( - Added EventType = "ADDED" - Modified EventType = "MODIFIED" - Deleted EventType = "DELETED" -) - -type nodeEvent struct { - EventType EventType `json:"type"` - Node *Node `json:"object"` -} - -type serviceEvent struct { - EventType EventType `json:"type"` - Service *Service `json:"object"` -} - -type endpointsEvent struct { - EventType EventType `json:"type"` - Endpoints *Endpoints `json:"object"` -} - -// From here down types are copied from -// https://github.com/GoogleCloudPlatform/kubernetes/blob/master/pkg/api/v1/types.go -// with all currently irrelevant types/fields stripped out. This removes the -// need for any kubernetes dependencies, with the drawback of having to keep -// this file up to date. - -// ListMeta describes metadata that synthetic resources must have, including lists and -// various status objects. -type ListMeta struct { - // An opaque value that represents the version of this response for use with optimistic - // concurrency and change monitoring endpoints. Clients must treat these values as opaque - // and values may only be valid for a particular resource or set of resources. Only servers - // will generate resource versions. - ResourceVersion string `json:"resourceVersion,omitempty" description:"string that identifies the internal version of this object that can be used by clients to determine when objects have changed; populated by the system, read-only; value must be treated as opaque by clients and passed unmodified back to the server: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#concurrency-control-and-consistency"` -} - -// ObjectMeta is metadata that all persisted resources must have, which includes all objects -// users must create. -type ObjectMeta struct { - // Name is unique within a namespace. Name is required when creating resources, although - // some resources may allow a client to request the generation of an appropriate name - // automatically. Name is primarily intended for creation idempotence and configuration - // definition. - Name string `json:"name,omitempty" description:"string that identifies an object. Must be unique within a namespace; cannot be updated; see http://releases.k8s.io/HEAD/docs/user-guide/identifiers.md#names"` - - // Namespace defines the space within which name must be unique. An empty namespace is - // equivalent to the "default" namespace, but "default" is the canonical representation. - // Not all objects are required to be scoped to a namespace - the value of this field for - // those objects will be empty. - Namespace string `json:"namespace,omitempty" description:"namespace of the object; must be a DNS_LABEL; cannot be updated; see http://releases.k8s.io/HEAD/docs/user-guide/namespaces.md"` - - ResourceVersion string `json:"resourceVersion,omitempty" description:"string that identifies the internal version of this object that can be used by clients to determine when objects have changed; populated by the system, read-only; value must be treated as opaque by clients and passed unmodified back to the server: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#concurrency-control-and-consistency"` - - // TODO: replace map[string]string with labels.LabelSet type - Labels map[string]string `json:"labels,omitempty" description:"map of string keys and values that can be used to organize and categorize objects; may match selectors of replication controllers and services; see http://releases.k8s.io/HEAD/docs/user-guide/labels.md"` - - // Annotations are unstructured key value data stored with a resource that may be set by - // external tooling. They are not queryable and should be preserved when modifying - // objects. - Annotations map[string]string `json:"annotations,omitempty" description:"map of string keys and values that can be used by external tooling to store and retrieve arbitrary metadata about objects; see http://releases.k8s.io/HEAD/docs/user-guide/annotations.md"` -} - -// Protocol defines network protocols supported for things like container ports. -type Protocol string - -const ( - // ProtocolTCP is the TCP protocol. - ProtocolTCP Protocol = "TCP" - // ProtocolUDP is the UDP protocol. - ProtocolUDP Protocol = "UDP" -) - -const ( - // NamespaceAll is the default argument to specify on a context when you want to list or filter resources across all namespaces - NamespaceAll string = "" -) - -// Container represents a single container that is expected to be run on the host. -type Container struct { - // Required: This must be a DNS_LABEL. Each container in a pod must - // have a unique name. - Name string `json:"name" description:"name of the container; must be a DNS_LABEL and unique within the pod; cannot be updated"` - // Optional. - Image string `json:"image,omitempty" description:"Docker image name; see http://releases.k8s.io/HEAD/docs/user-guide/images.md"` - - Ports []ContainerPort `json:"ports"` -} - -type ContainerPort struct { - Name string `json:"name"` - ContainerPort int32 `json:"containerPort"` - Protocol string `json:"protocol"` -} - -// Service is a named abstraction of software service (for example, mysql) consisting of local port -// (for example 3306) that the proxy listens on, and the selector that determines which pods -// will answer requests sent through the proxy. -type Service struct { - ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` - - // Spec defines the behavior of a service. - // http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status - Spec ServiceSpec `json:"spec,omitempty"` -} - -// ServiceSpec describes the attributes that a user creates on a service. -type ServiceSpec struct { - // The list of ports that are exposed by this service. - // More info: http://releases.k8s.io/HEAD/docs/user-guide/services.md#virtual-ips-and-service-proxies - Ports []ServicePort `json:"ports"` -} - -// ServicePort contains information on service's port. -type ServicePort struct { - // The IP protocol for this port. Supports "TCP" and "UDP". - // Default is TCP. - Protocol Protocol `json:"protocol,omitempty"` - - // The port that will be exposed by this service. - Port int32 `json:"port"` -} - -// ServiceList holds a list of services. -type ServiceList struct { - ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` - - Items []Service `json:"items" description:"list of services"` -} - -// Endpoints is a collection of endpoints that implement the actual service. Example: -// Name: "mysvc", -// Subsets: [ -// { -// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}], -// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}] -// }, -// { -// Addresses: [{"ip": "10.10.3.3"}], -// Ports: [{"name": "a", "port": 93}, {"name": "b", "port": 76}] -// }, -// ] -type Endpoints struct { - ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` - - // The set of all endpoints is the union of all subsets. - Subsets []EndpointSubset `json:"subsets" description:"sets of addresses and ports that comprise a service"` -} - -// EndpointSubset is a group of addresses with a common set of ports. The -// expanded set of endpoints is the Cartesian product of Addresses x Ports. -// For example, given: -// { -// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}], -// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}] -// } -// The resulting set of endpoints can be viewed as: -// a: [ 10.10.1.1:8675, 10.10.2.2:8675 ], -// b: [ 10.10.1.1:309, 10.10.2.2:309 ] -type EndpointSubset struct { - Addresses []EndpointAddress `json:"addresses,omitempty" description:"IP addresses which offer the related ports"` - Ports []EndpointPort `json:"ports,omitempty" description:"port numbers available on the related IP addresses"` -} - -// EndpointAddress is a tuple that describes single IP address. -type EndpointAddress struct { - // The IP of this endpoint. - // TODO: This should allow hostname or IP, see #4447. - IP string `json:"ip" description:"IP address of the endpoint"` -} - -// EndpointPort is a tuple that describes a single port. -type EndpointPort struct { - // The port number. - Port int `json:"port" description:"port number of the endpoint"` - - // The IP protocol for this port. - Protocol Protocol `json:"protocol,omitempty" description:"protocol for this port; must be UDP or TCP; TCP if unspecified"` -} - -// EndpointsList is a list of endpoints. -type EndpointsList struct { - ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` - - Items []Endpoints `json:"items" description:"list of endpoints"` -} - -// NodeStatus is information about the current status of a node. -type NodeStatus struct { - // Queried from cloud provider, if available. - Addresses []NodeAddress `json:"addresses,omitempty" description:"list of addresses reachable to the node; see http://releases.k8s.io/HEAD/docs/admin/node.md#node-addresses" patchStrategy:"merge" patchMergeKey:"type"` - // Endpoints of daemons running on the Node. - DaemonEndpoints NodeDaemonEndpoints `json:"daemonEndpoints,omitempty"` -} - -// NodeDaemonEndpoints lists ports opened by daemons running on the Node. -type NodeDaemonEndpoints struct { - // Endpoint on which Kubelet is listening. - KubeletEndpoint DaemonEndpoint `json:"kubeletEndpoint,omitempty"` -} - -// DaemonEndpoint contains information about a single Daemon endpoint. -type DaemonEndpoint struct { - /* - The port tag was not properly in quotes in earlier releases, so it must be - uppercased for backwards compat (since it was falling back to var name of - 'Port'). - */ - - // Port number of the given endpoint. - Port int32 `json:"Port"` -} - -// NodeAddressType can legally only have the values defined as constants below. -type NodeAddressType string - -// These are valid address types of node. NodeLegacyHostIP is used to transit -// from out-dated HostIP field to NodeAddress. -const ( - NodeLegacyHostIP NodeAddressType = "LegacyHostIP" - NodeHostName NodeAddressType = "Hostname" - NodeExternalIP NodeAddressType = "ExternalIP" - NodeInternalIP NodeAddressType = "InternalIP" -) - -// NodeAddress defines the address of a node. -type NodeAddress struct { - Type NodeAddressType `json:"type" description:"node address type, one of Hostname, ExternalIP or InternalIP"` - Address string `json:"address" description:"the node address"` -} - -// Node is a worker node in Kubernetes, formerly known as minion. -// Each node will have a unique identifier in the cache (i.e. in etcd). -type Node struct { - ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` - - // Status describes the current status of a Node - Status NodeStatus `json:"status,omitempty" description:"most recently observed status of the node; populated by the system, read-only; http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status"` -} - -// NodeList is the whole list of all Nodes which have been registered with master. -type NodeList struct { - ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` - - Items []Node `json:"items" description:"list of nodes"` -} - -type Pod struct { - ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` - PodStatus `json:"status,omitempty" description:"pod status object; see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_podstatus"` - PodSpec `json:"spec,omitempty" description:"pod spec object; see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_podspec"` -} - -type podEvent struct { - EventType EventType `json:"type"` - Pod *Pod `json:"object"` -} - -type PodList struct { - ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` - - Items []Pod `json:"items" description:"list of pods"` -} - -type PodStatus struct { - Phase string `json:"phase" description:"Current condition of the pod. More info: http://kubernetes.io/v1.1/docs/user-guide/pod-states.html#pod-phase"` - PodIP string `json:"podIP" description:"IP address allocated to the pod. Routable at least within the cluster. Empty if not yet allocated."` - Conditions []PodCondition `json:"conditions" description:"Current service state of pod."` - HostIP string `json:"hostIP,omitempty" description:"IP address of the host to which the pod is assigned. Empty if not yet scheduled."` -} - -type PodSpec struct { - Containers []Container `json:"containers" description:"list of containers, see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_container"` - NodeName string `json:"nodeName,omitempty" description:"NodeName is a request to schedule this pod onto a specific node. If it is non-empty, the scheduler simply schedules this pod onto that node, assuming that it fits resource requirements."` -} - -type PodCondition struct { - Type string `json:"type" description:"Type is the type of the condition. Currently only Ready."` - Status string `json:"status" description:"Status is the status of the condition. Can be True, False, Unknown."` -} diff --git a/retrieval/discovery/kubernetes_v2/node.go b/retrieval/discovery/kubernetes_v2/node.go deleted file mode 100644 index 4e2dbabcae..0000000000 --- a/retrieval/discovery/kubernetes_v2/node.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kubernetesv2 - -import ( - "fmt" - "net" - "strconv" - - "github.com/prometheus/common/log" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/util/strutil" - "golang.org/x/net/context" - "k8s.io/client-go/1.5/pkg/api" - apiv1 "k8s.io/client-go/1.5/pkg/api/v1" - "k8s.io/client-go/1.5/tools/cache" -) - -// Node discovers Kubernetes nodes. -type Node struct { - logger log.Logger - informer cache.SharedInformer - store cache.Store -} - -// NewNode returns a new node discovery. -func NewNode(l log.Logger, inf cache.SharedInformer) *Node { - return &Node{logger: l, informer: inf, store: inf.GetStore()} -} - -// Run implements the TargetProvider interface. -func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - // Send full initial set of pod targets. - var initial []*config.TargetGroup - for _, o := range n.store.List() { - tg := n.buildNode(o.(*apiv1.Node)) - initial = append(initial, tg) - } - select { - case <-ctx.Done(): - return - case ch <- initial: - } - - // Send target groups for service updates. - send := func(tg *config.TargetGroup) { - select { - case <-ctx.Done(): - case ch <- []*config.TargetGroup{tg}: - } - } - n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(o interface{}) { - send(n.buildNode(o.(*apiv1.Node))) - }, - DeleteFunc: func(o interface{}) { - send(&config.TargetGroup{Source: nodeSource(o.(*apiv1.Node))}) - }, - UpdateFunc: func(_, o interface{}) { - send(n.buildNode(o.(*apiv1.Node))) - }, - }) - - // Block until the target provider is explicitly canceled. - <-ctx.Done() -} - -func nodeSource(n *apiv1.Node) string { - return "node/" + n.Namespace + "/" + n.Name -} - -const ( - nodeNameLabel = metaLabelPrefix + "node_name" - nodeLabelPrefix = metaLabelPrefix + "node_label_" - nodeAnnotationPrefix = metaLabelPrefix + "node_annotation_" - nodeAddressPrefix = metaLabelPrefix + "node_address_" -) - -func nodeLabels(n *apiv1.Node) model.LabelSet { - ls := make(model.LabelSet, len(n.Labels)+len(n.Annotations)+2) - - ls[nodeNameLabel] = lv(n.Name) - - for k, v := range n.Labels { - ln := strutil.SanitizeLabelName(nodeLabelPrefix + k) - ls[model.LabelName(ln)] = lv(v) - } - - for k, v := range n.Annotations { - ln := strutil.SanitizeLabelName(nodeAnnotationPrefix + k) - ls[model.LabelName(ln)] = lv(v) - } - return ls -} - -func (n *Node) buildNode(node *apiv1.Node) *config.TargetGroup { - tg := &config.TargetGroup{ - Source: nodeSource(node), - } - tg.Labels = nodeLabels(node) - - addr, addrMap, err := nodeAddress(node) - if err != nil { - n.logger.With("err", err).Debugf("No node address found") - return nil - } - addr = net.JoinHostPort(addr, strconv.FormatInt(int64(node.Status.DaemonEndpoints.KubeletEndpoint.Port), 10)) - - t := model.LabelSet{ - model.AddressLabel: lv(addr), - model.InstanceLabel: lv(node.Name), - } - - for ty, a := range addrMap { - ln := strutil.SanitizeLabelName(nodeAddressPrefix + string(ty)) - t[model.LabelName(ln)] = lv(a[0]) - } - tg.Targets = append(tg.Targets, t) - - return tg -} - -// nodeAddresses returns the provided node's address, based on the priority: -// 1. NodeInternalIP -// 2. NodeExternalIP -// 3. NodeLegacyHostIP -// 3. NodeHostName -// -// Derived from k8s.io/kubernetes/pkg/util/node/node.go -func nodeAddress(node *apiv1.Node) (string, map[apiv1.NodeAddressType][]string, error) { - m := map[apiv1.NodeAddressType][]string{} - for _, a := range node.Status.Addresses { - m[a.Type] = append(m[a.Type], a.Address) - } - - if addresses, ok := m[apiv1.NodeInternalIP]; ok { - return addresses[0], m, nil - } - if addresses, ok := m[apiv1.NodeExternalIP]; ok { - return addresses[0], m, nil - } - if addresses, ok := m[apiv1.NodeAddressType(api.NodeLegacyHostIP)]; ok { - return addresses[0], m, nil - } - if addresses, ok := m[apiv1.NodeHostName]; ok { - return addresses[0], m, nil - } - return "", m, fmt.Errorf("host address unknown") -} diff --git a/retrieval/discovery/kubernetes_v2/pod.go b/retrieval/discovery/kubernetes_v2/pod.go deleted file mode 100644 index 8da6868115..0000000000 --- a/retrieval/discovery/kubernetes_v2/pod.go +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kubernetesv2 - -import ( - "fmt" - "net" - "strconv" - "strings" - - "github.com/prometheus/common/log" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/util/strutil" - "golang.org/x/net/context" - "k8s.io/client-go/1.5/pkg/api" - apiv1 "k8s.io/client-go/1.5/pkg/api/v1" - "k8s.io/client-go/1.5/tools/cache" -) - -// Pod discovers new pod targets. -type Pod struct { - informer cache.SharedInformer - store cache.Store - logger log.Logger -} - -// NewPods creates a new pod discovery. -func NewPods(l log.Logger, pods cache.SharedInformer) *Pod { - return &Pod{ - informer: pods, - store: pods.GetStore(), - logger: l, - } -} - -// Run implements the TargetProvider interface. -func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - // Send full initial set of pod targets. - var initial []*config.TargetGroup - for _, o := range p.store.List() { - tg := p.buildPod(o.(*apiv1.Pod)) - initial = append(initial, tg) - - p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("initial pod") - } - select { - case <-ctx.Done(): - return - case ch <- initial: - } - - // Send target groups for pod updates. - send := func(tg *config.TargetGroup) { - p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("pod update") - select { - case <-ctx.Done(): - case ch <- []*config.TargetGroup{tg}: - } - } - p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(o interface{}) { - send(p.buildPod(o.(*apiv1.Pod))) - }, - DeleteFunc: func(o interface{}) { - send(&config.TargetGroup{Source: podSource(o.(*apiv1.Pod))}) - }, - UpdateFunc: func(_, o interface{}) { - send(p.buildPod(o.(*apiv1.Pod))) - }, - }) - - // Block until the target provider is explicitly canceled. - <-ctx.Done() -} - -const ( - podNameLabel = metaLabelPrefix + "pod_name" - podIPLabel = metaLabelPrefix + "pod_ip" - podContainerNameLabel = metaLabelPrefix + "pod_container_name" - podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name" - podContainerPortNumberLabel = metaLabelPrefix + "pod_container_port_number" - podContainerPortProtocolLabel = metaLabelPrefix + "pod_container_port_protocol" - podReadyLabel = metaLabelPrefix + "pod_ready" - podLabelPrefix = metaLabelPrefix + "pod_label_" - podAnnotationPrefix = metaLabelPrefix + "pod_annotation_" - podNodeNameLabel = metaLabelPrefix + "pod_node_name" - podHostIPLabel = metaLabelPrefix + "pod_host_ip" -) - -func podLabels(pod *apiv1.Pod) model.LabelSet { - ls := model.LabelSet{ - podNameLabel: lv(pod.ObjectMeta.Name), - podIPLabel: lv(pod.Status.PodIP), - podReadyLabel: podReady(pod), - podNodeNameLabel: lv(pod.Spec.NodeName), - podHostIPLabel: lv(pod.Status.HostIP), - } - - for k, v := range pod.Labels { - ln := strutil.SanitizeLabelName(serviceLabelPrefix + k) - ls[model.LabelName(ln)] = lv(v) - } - - for k, v := range pod.Annotations { - ln := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) - ls[model.LabelName(ln)] = lv(v) - } - - return ls -} - -func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup { - // During startup the pod may not have an IP yet. This does not even allow - // for an up metric, so we skip the target. - if len(pod.Status.PodIP) == 0 { - return nil - } - tg := &config.TargetGroup{ - Source: podSource(pod), - } - tg.Labels = podLabels(pod) - tg.Labels[namespaceLabel] = lv(pod.Namespace) - - for _, c := range pod.Spec.Containers { - // If no ports are defined for the container, create an anonymous - // target per container. - if len(c.Ports) == 0 { - // We don't have a port so we just set the address label to the pod IP. - // The user has to add a port manually. - tg.Targets = append(tg.Targets, model.LabelSet{ - model.AddressLabel: lv(pod.Status.PodIP), - podContainerNameLabel: lv(c.Name), - }) - continue - } - // Otherwise create one target for each container/port combination. - for _, port := range c.Ports { - ports := strconv.FormatInt(int64(port.ContainerPort), 10) - addr := net.JoinHostPort(pod.Status.PodIP, ports) - - tg.Targets = append(tg.Targets, model.LabelSet{ - model.AddressLabel: lv(addr), - podContainerNameLabel: lv(c.Name), - podContainerPortNumberLabel: lv(ports), - podContainerPortNameLabel: lv(port.Name), - podContainerPortProtocolLabel: lv(string(port.Protocol)), - }) - } - } - - return tg -} - -func podSource(pod *apiv1.Pod) string { - return "pod/" + pod.Namespace + "/" + pod.Name -} - -func podReady(pod *apiv1.Pod) model.LabelValue { - for _, cond := range pod.Status.Conditions { - if cond.Type == apiv1.PodReady { - return lv(strings.ToLower(string(cond.Status))) - } - } - return lv(strings.ToLower(string(api.ConditionUnknown))) -} diff --git a/retrieval/discovery/kubernetes_v2/service.go b/retrieval/discovery/kubernetes_v2/service.go deleted file mode 100644 index 7b46e36c17..0000000000 --- a/retrieval/discovery/kubernetes_v2/service.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kubernetesv2 - -import ( - "net" - "strconv" - - "github.com/prometheus/common/log" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/util/strutil" - "golang.org/x/net/context" - apiv1 "k8s.io/client-go/1.5/pkg/api/v1" - "k8s.io/client-go/1.5/tools/cache" -) - -// Service implements discovery of Kubernetes services. -type Service struct { - logger log.Logger - informer cache.SharedInformer - store cache.Store -} - -// NewService returns a new service discovery. -func NewService(l log.Logger, inf cache.SharedInformer) *Service { - return &Service{logger: l, informer: inf, store: inf.GetStore()} -} - -// Run implements the TargetProvider interface. -func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - // Send full initial set of pod targets. - var initial []*config.TargetGroup - for _, o := range s.store.List() { - tg := s.buildService(o.(*apiv1.Service)) - initial = append(initial, tg) - } - select { - case <-ctx.Done(): - return - case ch <- initial: - } - - // Send target groups for service updates. - send := func(tg *config.TargetGroup) { - select { - case <-ctx.Done(): - case ch <- []*config.TargetGroup{tg}: - } - } - s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(o interface{}) { - send(s.buildService(o.(*apiv1.Service))) - }, - DeleteFunc: func(o interface{}) { - send(&config.TargetGroup{Source: serviceSource(o.(*apiv1.Service))}) - }, - UpdateFunc: func(_, o interface{}) { - send(s.buildService(o.(*apiv1.Service))) - }, - }) - - // Block until the target provider is explicitly canceled. - <-ctx.Done() -} - -func serviceSource(s *apiv1.Service) string { - return "svc/" + s.Namespace + "/" + s.Name -} - -const ( - serviceNameLabel = metaLabelPrefix + "service_name" - serviceLabelPrefix = metaLabelPrefix + "service_label_" - serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_" - servicePortNameLabel = metaLabelPrefix + "service_port_name" - servicePortProtocolLabel = metaLabelPrefix + "service_port_protocol" -) - -func serviceLabels(svc *apiv1.Service) model.LabelSet { - ls := make(model.LabelSet, len(svc.Labels)+len(svc.Annotations)+2) - - ls[serviceNameLabel] = lv(svc.Name) - - for k, v := range svc.Labels { - ln := strutil.SanitizeLabelName(serviceLabelPrefix + k) - ls[model.LabelName(ln)] = lv(v) - } - - for k, v := range svc.Annotations { - ln := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) - ls[model.LabelName(ln)] = lv(v) - } - return ls -} - -func (s *Service) buildService(svc *apiv1.Service) *config.TargetGroup { - tg := &config.TargetGroup{ - Source: serviceSource(svc), - } - tg.Labels = serviceLabels(svc) - tg.Labels[namespaceLabel] = lv(svc.Namespace) - - for _, port := range svc.Spec.Ports { - addr := net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", strconv.FormatInt(int64(port.Port), 10)) - - tg.Targets = append(tg.Targets, model.LabelSet{ - model.AddressLabel: lv(addr), - servicePortNameLabel: lv(port.Name), - servicePortProtocolLabel: lv(string(port.Protocol)), - }) - } - - return tg -} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 590e8f6853..30dfbe8ab1 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -399,14 +399,6 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { } app("kubernetes", i, k) } - for i, c := range cfg.KubernetesV2SDConfigs { - k, err := discovery.NewKubernetesV2Discovery(c) - if err != nil { - log.Errorf("Cannot create Kubernetes discovery: %s", err) - continue - } - app("kubernetes", i, k) - } for i, c := range cfg.ServersetSDConfigs { app("serverset", i, discovery.NewServersetDiscovery(c)) }