mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 22:19:40 -08:00
commit
dec56838fc
|
@ -114,3 +114,35 @@ scrape_configs:
|
||||||
target_label: kubernetes_namespace
|
target_label: kubernetes_namespace
|
||||||
- source_labels: [__meta_kubernetes_service_name]
|
- source_labels: [__meta_kubernetes_service_name]
|
||||||
target_label: kubernetes_name
|
target_label: kubernetes_name
|
||||||
|
|
||||||
|
# Example scrape config for pods
|
||||||
|
#
|
||||||
|
# The relabeling allows the actual pod scrape endpoint to be configured via the
|
||||||
|
# following annotations:
|
||||||
|
#
|
||||||
|
# * `prometheus.io/scrape`: Only scrape pods that have a value of `true`
|
||||||
|
# * `prometheus.io/port`: Scrape the pod on the indicated port instead of the default of `9102`.
|
||||||
|
- job_name: 'kubernetes-pods'
|
||||||
|
|
||||||
|
kubernetes_sd_configs:
|
||||||
|
- api_servers:
|
||||||
|
- 'https://kubernetes.default.svc'
|
||||||
|
in_cluster: true
|
||||||
|
|
||||||
|
relabel_configs:
|
||||||
|
- source_labels: [__meta_kubernetes_role, __meta_kubernetes_pod_annotation_prometheus_io_scrape]
|
||||||
|
action: keep
|
||||||
|
regex: pod;true
|
||||||
|
- source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
|
||||||
|
action: replace
|
||||||
|
regex: (.+):(?:\d+);(\d+)
|
||||||
|
replacement: ${1]:${2}
|
||||||
|
target_label: __address__
|
||||||
|
- action: labelmap
|
||||||
|
regex: __meta_kubernetes_pod_label_(.+)
|
||||||
|
- source_labels: [__meta_kubernetes_pod_namespace]
|
||||||
|
action: replace
|
||||||
|
target_label: kubernetes_namespace
|
||||||
|
- source_labels: [__meta_kubernetes_pod_name]
|
||||||
|
action: replace
|
||||||
|
target_label: kubernetes_pod_name
|
||||||
|
|
|
@ -14,12 +14,16 @@
|
||||||
package kubernetes
|
package kubernetes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -33,27 +37,55 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
sourceServicePrefix = "services"
|
|
||||||
|
|
||||||
// kubernetesMetaLabelPrefix is the meta prefix used for all meta labels.
|
// kubernetesMetaLabelPrefix is the meta prefix used for all meta labels.
|
||||||
// in this discovery.
|
// in this discovery.
|
||||||
metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_"
|
metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_"
|
||||||
|
|
||||||
|
// roleLabel is the name for the label containing a target's role.
|
||||||
|
roleLabel = metaLabelPrefix + "role"
|
||||||
|
|
||||||
|
sourcePodPrefix = "pods"
|
||||||
|
// podsTargetGroupNAme is the name given to the target group for pods
|
||||||
|
podsTargetGroupName = "pods"
|
||||||
|
// podNamespaceLabel is the name for the label containing a target pod's namespace
|
||||||
|
podNamespaceLabel = metaLabelPrefix + "pod_namespace"
|
||||||
|
// podNameLabel is the name for the label containing a target pod's name
|
||||||
|
podNameLabel = metaLabelPrefix + "pod_name"
|
||||||
|
// podAddressLabel is the name for the label containing a target pod's IP address (the PodIP)
|
||||||
|
podAddressLabel = metaLabelPrefix + "pod_address"
|
||||||
|
// podContainerNameLabel is the name for the label containing a target's container name
|
||||||
|
podContainerNameLabel = metaLabelPrefix + "pod_container_name"
|
||||||
|
// podContainerPortNameLabel is the name for the label containing the name of the port selected for a target
|
||||||
|
podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name"
|
||||||
|
// PodContainerPortListLabel is the name for the label containing a list of all TCP ports on the target container
|
||||||
|
podContainerPortListLabel = metaLabelPrefix + "pod_container_port_list"
|
||||||
|
// PodContainerPortMapPrefix is the prefix used to create the names of labels that associate container port names to port values
|
||||||
|
// Such labels will be named (podContainerPortMapPrefix)_(PortName) = (ContainerPort)
|
||||||
|
podContainerPortMapPrefix = metaLabelPrefix + "pod_container_port_map_"
|
||||||
|
// podReadyLabel is the name for the label containing the 'Ready' status (true/false/unknown) for a target
|
||||||
|
podReadyLabel = metaLabelPrefix + "pod_ready"
|
||||||
|
// podLabelPrefix is the prefix for prom label names corresponding to k8s labels for a target pod
|
||||||
|
podLabelPrefix = metaLabelPrefix + "pod_label_"
|
||||||
|
// podAnnotationPrefix is the prefix for prom label names corresponding to k8s annotations for a target pod
|
||||||
|
podAnnotationPrefix = metaLabelPrefix + "pod_annotation_"
|
||||||
|
|
||||||
|
sourceServicePrefix = "services"
|
||||||
// serviceNamespaceLabel is the name for the label containing a target's service namespace.
|
// serviceNamespaceLabel is the name for the label containing a target's service namespace.
|
||||||
serviceNamespaceLabel = metaLabelPrefix + "service_namespace"
|
serviceNamespaceLabel = metaLabelPrefix + "service_namespace"
|
||||||
// serviceNameLabel is the name for the label containing a target's service name.
|
// serviceNameLabel is the name for the label containing a target's service name.
|
||||||
serviceNameLabel = metaLabelPrefix + "service_name"
|
serviceNameLabel = metaLabelPrefix + "service_name"
|
||||||
// nodeLabelPrefix is the prefix for the node labels.
|
|
||||||
nodeLabelPrefix = metaLabelPrefix + "node_label_"
|
|
||||||
// serviceLabelPrefix is the prefix for the service labels.
|
// serviceLabelPrefix is the prefix for the service labels.
|
||||||
serviceLabelPrefix = metaLabelPrefix + "service_label_"
|
serviceLabelPrefix = metaLabelPrefix + "service_label_"
|
||||||
// serviceAnnotationPrefix is the prefix for the service annotations.
|
// serviceAnnotationPrefix is the prefix for the service annotations.
|
||||||
serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_"
|
serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_"
|
||||||
|
|
||||||
// nodesTargetGroupName is the name given to the target group for nodes.
|
// nodesTargetGroupName is the name given to the target group for nodes.
|
||||||
nodesTargetGroupName = "nodes"
|
nodesTargetGroupName = "nodes"
|
||||||
|
// nodeLabelPrefix is the prefix for the node labels.
|
||||||
|
nodeLabelPrefix = metaLabelPrefix + "node_label_"
|
||||||
|
|
||||||
// apiServersTargetGroupName is the name given to the target group for API servers.
|
// apiServersTargetGroupName is the name given to the target group for API servers.
|
||||||
apiServersTargetGroupName = "apiServers"
|
apiServersTargetGroupName = "apiServers"
|
||||||
// roleLabel is the name for the label containing a target's role.
|
|
||||||
roleLabel = metaLabelPrefix + "role"
|
|
||||||
|
|
||||||
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
|
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
|
||||||
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
|
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
|
||||||
|
@ -61,6 +93,7 @@ const (
|
||||||
apiVersion = "v1"
|
apiVersion = "v1"
|
||||||
apiPrefix = "/api/" + apiVersion
|
apiPrefix = "/api/" + apiVersion
|
||||||
nodesURL = apiPrefix + "/nodes"
|
nodesURL = apiPrefix + "/nodes"
|
||||||
|
podsURL = apiPrefix + "/pods"
|
||||||
servicesURL = apiPrefix + "/services"
|
servicesURL = apiPrefix + "/services"
|
||||||
endpointsURL = apiPrefix + "/endpoints"
|
endpointsURL = apiPrefix + "/endpoints"
|
||||||
serviceEndpointsURL = apiPrefix + "/namespaces/%s/endpoints/%s"
|
serviceEndpointsURL = apiPrefix + "/namespaces/%s/endpoints/%s"
|
||||||
|
@ -75,9 +108,12 @@ type Discovery struct {
|
||||||
apiServersMu sync.RWMutex
|
apiServersMu sync.RWMutex
|
||||||
nodes map[string]*Node
|
nodes map[string]*Node
|
||||||
services map[string]map[string]*Service
|
services map[string]map[string]*Service
|
||||||
nodesMu sync.RWMutex
|
// map of namespace to (map of pod name to pod)
|
||||||
servicesMu sync.RWMutex
|
pods map[string]map[string]*Pod
|
||||||
runDone chan struct{}
|
nodesMu sync.RWMutex
|
||||||
|
servicesMu sync.RWMutex
|
||||||
|
podsMu sync.RWMutex
|
||||||
|
runDone chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize sets up the discovery for usage.
|
// Initialize sets up the discovery for usage.
|
||||||
|
@ -97,6 +133,7 @@ func (kd *Discovery) Initialize() error {
|
||||||
|
|
||||||
// Run implements the TargetProvider interface.
|
// Run implements the TargetProvider interface.
|
||||||
func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||||
|
log.Debugf("Kubernetes Discovery.Run beginning")
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
|
|
||||||
// Send an initial full view.
|
// Send an initial full view.
|
||||||
|
@ -106,6 +143,12 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||||
|
|
||||||
all = append(all, kd.updateAPIServersTargetGroup())
|
all = append(all, kd.updateAPIServersTargetGroup())
|
||||||
all = append(all, kd.updateNodesTargetGroup())
|
all = append(all, kd.updateNodesTargetGroup())
|
||||||
|
all = append(all, kd.updatePodsTargetGroup())
|
||||||
|
for _, ns := range kd.pods {
|
||||||
|
for _, pod := range ns {
|
||||||
|
all = append(all, kd.updatePodTargetGroup(pod))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case ch <- all:
|
case ch <- all:
|
||||||
|
@ -119,21 +162,32 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||||
|
|
||||||
go kd.watchNodes(update, ctx.Done(), retryInterval)
|
go kd.watchNodes(update, ctx.Done(), retryInterval)
|
||||||
go kd.startServiceWatch(update, ctx.Done(), retryInterval)
|
go kd.startServiceWatch(update, ctx.Done(), retryInterval)
|
||||||
|
go kd.watchPods(update, ctx.Done(), retryInterval)
|
||||||
|
|
||||||
var tg *config.TargetGroup
|
|
||||||
for {
|
for {
|
||||||
|
tg := []*config.TargetGroup{}
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case event := <-update:
|
case event := <-update:
|
||||||
switch obj := event.(type) {
|
switch obj := event.(type) {
|
||||||
case *nodeEvent:
|
case *nodeEvent:
|
||||||
|
log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", obj.EventType, obj.Node.ObjectMeta.Name)
|
||||||
kd.updateNode(obj.Node, obj.EventType)
|
kd.updateNode(obj.Node, obj.EventType)
|
||||||
tg = kd.updateNodesTargetGroup()
|
tg = append(tg, kd.updateNodesTargetGroup())
|
||||||
case *serviceEvent:
|
case *serviceEvent:
|
||||||
tg = kd.updateService(obj.Service, obj.EventType)
|
log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name)
|
||||||
|
tg = append(tg, kd.updateService(obj.Service, obj.EventType))
|
||||||
case *endpointsEvent:
|
case *endpointsEvent:
|
||||||
tg = kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)
|
log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name)
|
||||||
|
tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType))
|
||||||
|
case *podEvent:
|
||||||
|
log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", obj.EventType, obj.Pod.ObjectMeta.Name)
|
||||||
|
// Update the per-pod target group
|
||||||
|
kd.updatePod(obj.Pod, obj.EventType)
|
||||||
|
tg = append(tg, kd.updatePodTargetGroup(obj.Pod))
|
||||||
|
// ...and update the all pods target group
|
||||||
|
tg = append(tg, kd.updatePodsTargetGroup())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,10 +195,12 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
for _, t := range tg {
|
||||||
case ch <- []*config.TargetGroup{tg}:
|
select {
|
||||||
case <-ctx.Done():
|
case ch <- []*config.TargetGroup{t}:
|
||||||
return
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -173,7 +229,7 @@ func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error
|
||||||
lastErr = err
|
lastErr = err
|
||||||
kd.rotateAPIServers()
|
kd.rotateAPIServers()
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("Unable to query any API servers: %v", lastErr)
|
return nil, fmt.Errorf("unable to query any API servers: %v", lastErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kd *Discovery) rotateAPIServers() {
|
func (kd *Discovery) rotateAPIServers() {
|
||||||
|
@ -267,17 +323,17 @@ func (kd *Discovery) getNodes() (map[string]*Node, string, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
|
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
|
||||||
// & return error.
|
// & return error.
|
||||||
return nil, "", fmt.Errorf("Unable to list Kubernetes nodes: %s", err)
|
return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err)
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
if res.StatusCode != http.StatusOK {
|
if res.StatusCode != http.StatusOK {
|
||||||
return nil, "", fmt.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status)
|
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
var nodes NodeList
|
var nodes NodeList
|
||||||
if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil {
|
if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil {
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
return nil, "", fmt.Errorf("Unable to list Kubernetes nodes. Unexpected response body: %s", string(body))
|
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body))
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeMap := map[string]*Node{}
|
nodeMap := map[string]*Node{}
|
||||||
|
@ -293,16 +349,16 @@ func (kd *Discovery) getServices() (map[string]map[string]*Service, string, erro
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If we can't list services then we can't watch them. Assume this is a misconfiguration
|
// If we can't list services then we can't watch them. Assume this is a misconfiguration
|
||||||
// & return error.
|
// & return error.
|
||||||
return nil, "", fmt.Errorf("Unable to list Kubernetes services: %s", err)
|
return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err)
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
if res.StatusCode != http.StatusOK {
|
if res.StatusCode != http.StatusOK {
|
||||||
return nil, "", fmt.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status)
|
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status)
|
||||||
}
|
}
|
||||||
var services ServiceList
|
var services ServiceList
|
||||||
if err := json.NewDecoder(res.Body).Decode(&services); err != nil {
|
if err := json.NewDecoder(res.Body).Decode(&services); err != nil {
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
return nil, "", fmt.Errorf("Unable to list Kubernetes services. Unexpected response body: %s", string(body))
|
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body))
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceMap := map[string]map[string]*Service{}
|
serviceMap := map[string]map[string]*Service{}
|
||||||
|
@ -735,3 +791,243 @@ func nodeHostIP(node *Node) (net.IP, error) {
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses)
|
return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////
|
||||||
|
// Here there be dragons. //
|
||||||
|
// Pod discovery code lies below. //
|
||||||
|
////////////////////////////////////
|
||||||
|
|
||||||
|
func (kd *Discovery) updatePod(pod *Pod, eventType EventType) {
|
||||||
|
kd.podsMu.Lock()
|
||||||
|
defer kd.podsMu.Unlock()
|
||||||
|
|
||||||
|
switch eventType {
|
||||||
|
case deleted:
|
||||||
|
if _, ok := kd.pods[pod.ObjectMeta.Namespace]; ok {
|
||||||
|
delete(kd.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name)
|
||||||
|
if len(kd.pods[pod.ObjectMeta.Namespace]) == 0 {
|
||||||
|
delete(kd.pods, pod.ObjectMeta.Namespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case added, modified:
|
||||||
|
if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok {
|
||||||
|
kd.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{}
|
||||||
|
}
|
||||||
|
kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kd *Discovery) getPods() (map[string]map[string]*Pod, string, error) {
|
||||||
|
res, err := kd.queryAPIServerPath(podsURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err)
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
var pods PodList
|
||||||
|
if err := json.NewDecoder(res.Body).Decode(&pods); err != nil {
|
||||||
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
|
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
podMap := map[string]map[string]*Pod{}
|
||||||
|
for idx, pod := range pods.Items {
|
||||||
|
if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok {
|
||||||
|
podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{}
|
||||||
|
}
|
||||||
|
log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace)
|
||||||
|
podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
return podMap, pods.ResourceVersion, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kd *Discovery) watchPods(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
||||||
|
until(func() {
|
||||||
|
pods, resourceVersion, err := kd.getPods()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Cannot initialize pods collection: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
kd.podsMu.Lock()
|
||||||
|
kd.pods = pods
|
||||||
|
kd.podsMu.Unlock()
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", podsURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Cannot create pods request: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
values := req.URL.Query()
|
||||||
|
values.Add("watch", "true")
|
||||||
|
values.Add("resourceVersion", resourceVersion)
|
||||||
|
req.URL.RawQuery = values.Encode()
|
||||||
|
res, err := kd.queryAPIServerReq(req)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to watch pods: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
log.Errorf("Failed to watch pods: %d", res.StatusCode)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
d := json.NewDecoder(res.Body)
|
||||||
|
|
||||||
|
for {
|
||||||
|
var event podEvent
|
||||||
|
if err := d.Decode(&event); err != nil {
|
||||||
|
log.Errorf("Watch pods unexpectedly closed: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case events <- &event:
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, retryInterval, done)
|
||||||
|
}
|
||||||
|
|
||||||
|
func podSource(pod *Pod) string {
|
||||||
|
return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
type ByContainerPort []ContainerPort
|
||||||
|
|
||||||
|
func (a ByContainerPort) Len() int { return len(a) }
|
||||||
|
func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort }
|
||||||
|
func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
|
||||||
|
type ByContainerName []Container
|
||||||
|
|
||||||
|
func (a ByContainerName) Len() int { return len(a) }
|
||||||
|
func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
||||||
|
func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
|
||||||
|
func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet {
|
||||||
|
var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers))
|
||||||
|
if pod.PodStatus.PodIP == "" {
|
||||||
|
log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name)
|
||||||
|
return targets
|
||||||
|
}
|
||||||
|
|
||||||
|
if pod.PodStatus.Phase != "Running" {
|
||||||
|
log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name)
|
||||||
|
return targets
|
||||||
|
}
|
||||||
|
|
||||||
|
ready := "unknown"
|
||||||
|
for _, cond := range pod.PodStatus.Conditions {
|
||||||
|
if strings.ToLower(cond.Type) == "ready" {
|
||||||
|
ready = strings.ToLower(cond.Status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(ByContainerName(pod.PodSpec.Containers))
|
||||||
|
|
||||||
|
for _, container := range pod.PodSpec.Containers {
|
||||||
|
// Collect a list of TCP ports
|
||||||
|
// Sort by port number, ascending
|
||||||
|
// Product a target pointed at the first port
|
||||||
|
// Include a label containing all ports (portName=port,PortName=port,...,)
|
||||||
|
var tcpPorts []ContainerPort
|
||||||
|
var portLabel *bytes.Buffer = bytes.NewBufferString(",")
|
||||||
|
|
||||||
|
for _, port := range container.Ports {
|
||||||
|
if port.Protocol == "TCP" {
|
||||||
|
tcpPorts = append(tcpPorts, port)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tcpPorts) == 0 {
|
||||||
|
log.Debugf("skipping container %s with no TCP ports", container.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(ByContainerPort(tcpPorts))
|
||||||
|
|
||||||
|
t := model.LabelSet{
|
||||||
|
model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))),
|
||||||
|
podNameLabel: model.LabelValue(pod.ObjectMeta.Name),
|
||||||
|
podAddressLabel: model.LabelValue(pod.PodStatus.PodIP),
|
||||||
|
podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace),
|
||||||
|
podContainerNameLabel: model.LabelValue(container.Name),
|
||||||
|
podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name),
|
||||||
|
podReadyLabel: model.LabelValue(ready),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, port := range tcpPorts {
|
||||||
|
portLabel.WriteString(port.Name)
|
||||||
|
portLabel.WriteString("=")
|
||||||
|
portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10))
|
||||||
|
portLabel.WriteString(",")
|
||||||
|
t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10))
|
||||||
|
}
|
||||||
|
|
||||||
|
t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String())
|
||||||
|
|
||||||
|
for k, v := range pod.ObjectMeta.Labels {
|
||||||
|
labelName := strutil.SanitizeLabelName(podLabelPrefix + k)
|
||||||
|
t[model.LabelName(labelName)] = model.LabelValue(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range pod.ObjectMeta.Annotations {
|
||||||
|
labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k)
|
||||||
|
t[model.LabelName(labelName)] = model.LabelValue(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
targets = append(targets, t)
|
||||||
|
|
||||||
|
if !allContainers {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return targets
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kd *Discovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup {
|
||||||
|
kd.podsMu.RLock()
|
||||||
|
defer kd.podsMu.RUnlock()
|
||||||
|
|
||||||
|
tg := &config.TargetGroup{
|
||||||
|
Source: podSource(pod),
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this pod doesn't exist, return an empty target group
|
||||||
|
if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok {
|
||||||
|
return tg
|
||||||
|
}
|
||||||
|
if _, ok := kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok {
|
||||||
|
return tg
|
||||||
|
}
|
||||||
|
|
||||||
|
tg.Labels = model.LabelSet{
|
||||||
|
roleLabel: model.LabelValue("container"),
|
||||||
|
}
|
||||||
|
tg.Targets = updatePodTargets(pod, true)
|
||||||
|
|
||||||
|
return tg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kd *Discovery) updatePodsTargetGroup() *config.TargetGroup {
|
||||||
|
tg := &config.TargetGroup{
|
||||||
|
Source: podsTargetGroupName,
|
||||||
|
Labels: model.LabelSet{
|
||||||
|
roleLabel: model.LabelValue("pod"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, namespace := range kd.pods {
|
||||||
|
for _, pod := range namespace {
|
||||||
|
tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tg
|
||||||
|
}
|
||||||
|
|
207
retrieval/discovery/kubernetes/discovery_test.go
Normal file
207
retrieval/discovery/kubernetes/discovery_test.go
Normal file
|
@ -0,0 +1,207 @@
|
||||||
|
// Copyright 2015 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kubernetes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
_ "github.com/prometheus/common/log"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
flag.Parse()
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
|
|
||||||
|
var portsA = []ContainerPort{
|
||||||
|
ContainerPort{
|
||||||
|
Name: "http",
|
||||||
|
ContainerPort: 80,
|
||||||
|
Protocol: "TCP",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var portsB = []ContainerPort{
|
||||||
|
ContainerPort{
|
||||||
|
Name: "https",
|
||||||
|
ContainerPort: 443,
|
||||||
|
Protocol: "TCP",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var portsNoTcp = []ContainerPort{
|
||||||
|
ContainerPort{
|
||||||
|
Name: "dns",
|
||||||
|
ContainerPort: 53,
|
||||||
|
Protocol: "UDP",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var portsMultiA = []ContainerPort{
|
||||||
|
ContainerPort{
|
||||||
|
Name: "http",
|
||||||
|
ContainerPort: 80,
|
||||||
|
Protocol: "TCP",
|
||||||
|
},
|
||||||
|
ContainerPort{
|
||||||
|
Name: "ssh",
|
||||||
|
ContainerPort: 22,
|
||||||
|
Protocol: "TCP",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var portsMultiB = []ContainerPort{
|
||||||
|
ContainerPort{
|
||||||
|
Name: "http",
|
||||||
|
ContainerPort: 80,
|
||||||
|
Protocol: "TCP",
|
||||||
|
},
|
||||||
|
ContainerPort{
|
||||||
|
Name: "https",
|
||||||
|
ContainerPort: 443,
|
||||||
|
Protocol: "TCP",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func container(name string, ports []ContainerPort) Container {
|
||||||
|
p := make([]ContainerPort, len(ports))
|
||||||
|
copy(p, ports)
|
||||||
|
|
||||||
|
// Shuffle order of ports to ensure code enforces determinism
|
||||||
|
for i := range p {
|
||||||
|
j := rand.Intn(i + 1)
|
||||||
|
p[i], p[j] = p[j], p[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
return Container{
|
||||||
|
Name: name,
|
||||||
|
Ports: p,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func pod(name string, containers []Container) *Pod {
|
||||||
|
c := make([]Container, len(containers))
|
||||||
|
copy(c, containers)
|
||||||
|
|
||||||
|
// Shuffle order of containers to ensure code enforces determinism
|
||||||
|
for i := range c {
|
||||||
|
j := rand.Intn(i + 1)
|
||||||
|
c[i], c[j] = c[j], c[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Pod{
|
||||||
|
ObjectMeta: ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
PodStatus: PodStatus{
|
||||||
|
PodIP: "1.1.1.1",
|
||||||
|
Phase: "Running",
|
||||||
|
Conditions: []PodCondition{
|
||||||
|
PodCondition{
|
||||||
|
Type: "Ready",
|
||||||
|
Status: "True",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
PodSpec: PodSpec{
|
||||||
|
Containers: c,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdatePodTargets(t *testing.T) {
|
||||||
|
var result []model.LabelSet
|
||||||
|
|
||||||
|
// Multiple iterations help ensure that we'll see different permutations via the various randomizations that occur
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
// Return no targets for a pod that isn't "Running"
|
||||||
|
result = updatePodTargets(&Pod{PodStatus: PodStatus{PodIP: "1.1.1.1"}}, true)
|
||||||
|
if len(result) > 0 {
|
||||||
|
t.Fatalf("expected 0 targets, received %d", len(result))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return no targets for a pod with no IP
|
||||||
|
result = updatePodTargets(&Pod{PodStatus: PodStatus{Phase: "Running"}}, true)
|
||||||
|
if len(result) > 0 {
|
||||||
|
t.Fatalf("expected 0 targets, received %d", len(result))
|
||||||
|
}
|
||||||
|
|
||||||
|
// A pod with no containers (?!) should not produce any targets
|
||||||
|
result = updatePodTargets(pod("empty", []Container{}), true)
|
||||||
|
if len(result) > 0 {
|
||||||
|
t.Fatalf("expected 0 targets, received %d", len(result))
|
||||||
|
}
|
||||||
|
|
||||||
|
// A pod with all valid containers should return one target per container with allContainers=true
|
||||||
|
result = updatePodTargets(pod("easy", []Container{container("a", portsA), container("b", portsB)}), true)
|
||||||
|
if len(result) != 2 {
|
||||||
|
t.Fatalf("expected 2 targets, received %d", len(result))
|
||||||
|
}
|
||||||
|
if result[0][podReadyLabel] != "true" {
|
||||||
|
t.Fatalf("expected result[0] podReadyLabel 'true', received '%s'", result[0][podReadyLabel])
|
||||||
|
}
|
||||||
|
if _, ok := result[0][podContainerPortMapPrefix+"http"]; !ok {
|
||||||
|
t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was missing")
|
||||||
|
}
|
||||||
|
if result[0][podContainerPortMapPrefix+"http"] != "80" {
|
||||||
|
t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was %s", result[0][podContainerPortMapPrefix+"http"])
|
||||||
|
}
|
||||||
|
if _, ok := result[1][podContainerPortMapPrefix+"https"]; !ok {
|
||||||
|
t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was missing")
|
||||||
|
}
|
||||||
|
if result[1][podContainerPortMapPrefix+"https"] != "443" {
|
||||||
|
t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was %s", result[1][podContainerPortMapPrefix+"https"])
|
||||||
|
}
|
||||||
|
|
||||||
|
// A pod with all valid containers should return one target with allContainers=false, and it should be the alphabetically first container
|
||||||
|
result = updatePodTargets(pod("easy", []Container{container("a", portsA), container("b", portsB)}), false)
|
||||||
|
if len(result) != 1 {
|
||||||
|
t.Fatalf("expected 1 targets, received %d", len(result))
|
||||||
|
}
|
||||||
|
if _, ok := result[0][podContainerNameLabel]; !ok {
|
||||||
|
t.Fatalf("expected result[0][podContainerNameLabel] to be 'a', but was missing")
|
||||||
|
}
|
||||||
|
if result[0][podContainerNameLabel] != "a" {
|
||||||
|
t.Fatalf("expected result[0][podContainerNameLabel] to be 'a', but was '%s'", result[0][podContainerNameLabel])
|
||||||
|
}
|
||||||
|
|
||||||
|
// A pod with some non-targetable containers should return one target per targetable container with allContainers=true
|
||||||
|
result = updatePodTargets(pod("mixed", []Container{container("a", portsA), container("no-tcp", portsNoTcp), container("b", portsB)}), true)
|
||||||
|
if len(result) != 2 {
|
||||||
|
t.Fatalf("expected 2 targets, received %d", len(result))
|
||||||
|
}
|
||||||
|
|
||||||
|
// A pod with a container with multiple ports should return the numerically smallest port
|
||||||
|
result = updatePodTargets(pod("hard", []Container{container("multiA", portsMultiA), container("multiB", portsMultiB)}), true)
|
||||||
|
if len(result) != 2 {
|
||||||
|
t.Fatalf("expected 2 targets, received %d", len(result))
|
||||||
|
}
|
||||||
|
if result[0][model.AddressLabel] != "1.1.1.1:22" {
|
||||||
|
t.Fatalf("expected result[0] address to be 1.1.1.1:22, received %s", result[0][model.AddressLabel])
|
||||||
|
}
|
||||||
|
if result[0][podContainerPortListLabel] != ",ssh=22,http=80," {
|
||||||
|
t.Fatalf("expected result[0] podContainerPortListLabel to be ',ssh=22,http=80,', received '%s'", result[0][podContainerPortListLabel])
|
||||||
|
}
|
||||||
|
if result[1][model.AddressLabel] != "1.1.1.1:80" {
|
||||||
|
t.Fatalf("expected result[1] address to be 1.1.1.1:80, received %s", result[1][model.AddressLabel])
|
||||||
|
}
|
||||||
|
if result[1][podContainerPortListLabel] != ",http=80,https=443," {
|
||||||
|
t.Fatalf("expected result[1] podContainerPortListLabel to be ',http=80,https=443,', received '%s'", result[1][podContainerPortListLabel])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -102,6 +102,14 @@ type Container struct {
|
||||||
Name string `json:"name" description:"name of the container; must be a DNS_LABEL and unique within the pod; cannot be updated"`
|
Name string `json:"name" description:"name of the container; must be a DNS_LABEL and unique within the pod; cannot be updated"`
|
||||||
// Optional.
|
// Optional.
|
||||||
Image string `json:"image,omitempty" description:"Docker image name; see http://releases.k8s.io/HEAD/docs/user-guide/images.md"`
|
Image string `json:"image,omitempty" description:"Docker image name; see http://releases.k8s.io/HEAD/docs/user-guide/images.md"`
|
||||||
|
|
||||||
|
Ports []ContainerPort `json:"ports"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ContainerPort struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
ContainerPort int32 `json:"containerPort"`
|
||||||
|
Protocol string `json:"protocol"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service is a named abstraction of software service (for example, mysql) consisting of local port
|
// Service is a named abstraction of software service (for example, mysql) consisting of local port
|
||||||
|
@ -235,3 +243,35 @@ type NodeList struct {
|
||||||
|
|
||||||
Items []Node `json:"items" description:"list of nodes"`
|
Items []Node `json:"items" description:"list of nodes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Pod struct {
|
||||||
|
ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
|
||||||
|
PodStatus `json:"status,omitempty" description:"pod status object; see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_podstatus"`
|
||||||
|
PodSpec `json:"spec,omitempty" description:"pod spec object; see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_podspec"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type podEvent struct {
|
||||||
|
EventType EventType `json:"type"`
|
||||||
|
Pod *Pod `json:"object"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PodList struct {
|
||||||
|
ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
|
||||||
|
|
||||||
|
Items []Pod `json:"items" description:"list of pods"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PodStatus struct {
|
||||||
|
Phase string `json:"phase" description:"Current condition of the pod. More info: http://kubernetes.io/v1.1/docs/user-guide/pod-states.html#pod-phase"`
|
||||||
|
PodIP string `json:"podIP" description:"IP address allocated to the pod. Routable at least within the cluster. Empty if not yet allocated."`
|
||||||
|
Conditions []PodCondition `json:"conditions" description:"Current service state of pod."`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PodSpec struct {
|
||||||
|
Containers []Container `json:"containers" description:"list of containers, see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_container"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PodCondition struct {
|
||||||
|
Type string `json:"type" description:"Type is the type of the condition. Currently only Ready."`
|
||||||
|
Status string `json:"status" description:"Status is the status of the condition. Can be True, False, Unknown."`
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue