Merge pull request #1781 from prometheus/fabxc-k8s-sd

Select Kubernetes SD type in configuration
This commit is contained in:
Fabian Reinartz 2016-07-05 14:29:46 +02:00 committed by GitHub
commit 6f19e418e1
13 changed files with 1037 additions and 875 deletions

View file

@ -792,6 +792,7 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro
// KubernetesSDConfig is the configuration for Kubernetes service discovery.
type KubernetesSDConfig struct {
APIServers []URL `yaml:"api_servers"`
Role string `yaml:"role"`
InCluster bool `yaml:"in_cluster,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
@ -804,6 +805,29 @@ type KubernetesSDConfig struct {
XXX map[string]interface{} `yaml:",inline"`
}
type KubernetesRole string
const (
KubernetesRoleNode = "node"
KubernetesRolePod = "pod"
KubernetesRoleContainer = "container"
KubernetesRoleService = "service"
KubernetesRoleEndpoint = "endpoint"
KubernetesRoleAPIServer = "apiserver"
)
func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := unmarshal((*string)(c)); err != nil {
return err
}
switch *c {
case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleContainer, KubernetesRoleService, KubernetesRoleEndpoint, KubernetesRoleAPIServer:
return nil
default:
return fmt.Errorf("Unknown Kubernetes SD role %q", c)
}
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultKubernetesSDConfig
@ -815,6 +839,9 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
if err := checkOverflow(c.XXX, "kubernetes_sd_config"); err != nil {
return err
}
if c.Role == "" {
return fmt.Errorf("role missing (one of: container, pod, service, endpoint, node, apiserver)")
}
if len(c.APIServers) == 0 {
return fmt.Errorf("Kubernetes SD configuration requires at least one Kubernetes API server")
}
@ -824,7 +851,6 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
if c.BasicAuth != nil && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) {
return fmt.Errorf("at most one of basic_auth, bearer_token & bearer_token_file must be configured")
}
return nil
}

View file

@ -223,6 +223,7 @@ var expectedConf = &Config{
KubernetesSDConfigs: []*KubernetesSDConfig{
{
APIServers: []URL{kubernetesSDHostURL()},
Role: KubernetesRoleEndpoint,
BasicAuth: &BasicAuth{
Username: "myusername",
Password: "mypassword",

View file

@ -109,7 +109,8 @@ scrape_configs:
- job_name: service-kubernetes
kubernetes_sd_configs:
- api_servers:
- role: endpoint
api_servers:
- 'https://localhost:1234'
basic_auth:

View file

@ -2,7 +2,8 @@ scrape_configs:
- job_name: prometheus
kubernetes_sd_configs:
- api_servers:
- role: node
api_servers:
- 'https://localhost:1234'
bearer_token: 1234

View file

@ -2,7 +2,8 @@ scrape_configs:
- job_name: prometheus
kubernetes_sd_configs:
- api_servers:
- role: pod
api_servers:
- 'https://localhost:1234'
bearer_token: 1234

View file

@ -1,24 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/consul"
)
// NewConsul creates a new Consul based Discovery.
func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) {
return consul.NewDiscovery(cfg)
}

View file

@ -1,4 +1,4 @@
// Copyright 2015 The Prometheus Authors
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -14,10 +14,20 @@
package discovery
import (
"time"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/consul"
"github.com/prometheus/prometheus/retrieval/discovery/dns"
"github.com/prometheus/prometheus/retrieval/discovery/kubernetes"
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
)
// NewConsul creates a new Consul based Discovery.
func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) {
return consul.NewDiscovery(cfg)
}
// NewKubernetesDiscovery creates a Kubernetes service discovery based on the passed-in configuration.
func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discovery, error) {
kd := &kubernetes.Discovery{
@ -29,3 +39,17 @@ func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discov
}
return kd, nil
}
// NewMarathon creates a new Marathon based discovery.
func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery {
return &marathon.Discovery{
Servers: conf.Servers,
RefreshInterval: time.Duration(conf.RefreshInterval),
Client: marathon.FetchApps,
}
}
// NewDNS creates a new DNS based discovery.
func NewDNS(conf *config.DNSSDConfig) *dns.Discovery {
return dns.NewDiscovery(conf)
}

View file

@ -1,25 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"github.com/prometheus/prometheus/retrieval/discovery/dns"
"github.com/prometheus/prometheus/config"
)
// NewDNS creates a new DNS based discovery.
func NewDNS(conf *config.DNSSDConfig) *dns.Discovery {
return dns.NewDiscovery(conf)
}

View file

@ -14,16 +14,11 @@
package kubernetes
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -33,7 +28,6 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil"
"github.com/prometheus/prometheus/util/strutil"
)
const (
@ -110,14 +104,6 @@ type Discovery struct {
apiServers []config.URL
apiServersMu sync.RWMutex
nodes map[string]*Node
services map[string]map[string]*Service
// map of namespace to (map of pod name to pod)
pods map[string]map[string]*Pod
nodesMu sync.RWMutex
servicesMu sync.RWMutex
podsMu sync.RWMutex
runDone chan struct{}
}
// Initialize sets up the discovery for usage.
@ -130,92 +116,43 @@ func (kd *Discovery) Initialize() error {
kd.apiServers = kd.Conf.APIServers
kd.client = client
kd.runDone = make(chan struct{})
return nil
}
// Run implements the TargetProvider interface.
func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
log.Debugf("Kubernetes Discovery.Run beginning")
log.Debugf("Start Kubernetes service discovery")
defer close(ch)
// Send an initial full view.
// TODO(fabxc): this does not include all available services and service
// endpoints yet. Service endpoints were also missing in the previous Sources() method.
var all []*config.TargetGroup
all = append(all, kd.updateAPIServersTargetGroup())
all = append(all, kd.updateNodesTargetGroup())
pods, _, err := kd.getPods()
if err != nil {
log.Errorf("Cannot initialize pods collection: %s", err)
return
}
kd.podsMu.Lock()
kd.pods = pods
kd.podsMu.Unlock()
all = append(all, kd.updatePodsTargetGroup())
for _, ns := range kd.pods {
for _, pod := range ns {
all = append(all, kd.updatePodTargetGroup(pod))
switch kd.Conf.Role {
case config.KubernetesRolePod, config.KubernetesRoleContainer:
pd := &podDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval),
kd: kd,
}
}
select {
case ch <- all:
case <-ctx.Done():
return
}
retryInterval := time.Duration(kd.Conf.RetryInterval)
update := make(chan interface{}, 10)
go kd.watchNodes(update, ctx.Done(), retryInterval)
go kd.startServiceWatch(update, ctx.Done(), retryInterval)
go kd.watchPods(update, ctx.Done(), retryInterval)
for {
tg := []*config.TargetGroup{}
pd.run(ctx, ch)
case config.KubernetesRoleNode:
nd := &nodeDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval),
kd: kd,
}
nd.run(ctx, ch)
case config.KubernetesRoleService, config.KubernetesRoleEndpoint:
sd := &serviceDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval),
kd: kd,
}
sd.run(ctx, ch)
case config.KubernetesRoleAPIServer:
select {
case ch <- []*config.TargetGroup{kd.updateAPIServersTargetGroup()}:
case <-ctx.Done():
return
case event := <-update:
switch obj := event.(type) {
case *nodeEvent:
log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", obj.EventType, obj.Node.ObjectMeta.Name)
kd.updateNode(obj.Node, obj.EventType)
tg = append(tg, kd.updateNodesTargetGroup())
case *serviceEvent:
log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name)
tg = append(tg, kd.updateService(obj.Service, obj.EventType))
case *endpointsEvent:
log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name)
tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType))
case *podEvent:
log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", obj.EventType, obj.Pod.ObjectMeta.Name)
// Update the per-pod target group
kd.updatePod(obj.Pod, obj.EventType)
tg = append(tg, kd.updatePodTargetGroup(obj.Pod))
// ...and update the all pods target group
tg = append(tg, kd.updatePodsTargetGroup())
}
}
if tg == nil {
continue
}
for _, t := range tg {
select {
case ch <- []*config.TargetGroup{t}:
case <-ctx.Done():
return
}
}
default:
log.Errorf("unknown Kubernetes discovery kind %q", kd.Conf.Role)
return
}
}
@ -283,437 +220,6 @@ func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup {
return tg
}
func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup {
kd.nodesMu.RLock()
defer kd.nodesMu.RUnlock()
tg := &config.TargetGroup{
Source: nodesTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("node"),
},
}
// Now let's loop through the nodes & add them to the target group with appropriate labels.
for nodeName, node := range kd.nodes {
defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node)
if err != nil {
log.Debugf("Skipping node %s: %s", node.Name, err)
continue
}
kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
address := fmt.Sprintf("%s:%d", defaultNodeAddress.String(), kubeletPort)
t := model.LabelSet{
model.AddressLabel: model.LabelValue(address),
model.InstanceLabel: model.LabelValue(nodeName),
}
for addrType, ip := range nodeAddressMap {
labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType))
t[model.LabelName(labelName)] = model.LabelValue(ip[0].String())
}
t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort))
for k, v := range node.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
tg.Targets = append(tg.Targets, t)
}
return tg
}
func (kd *Discovery) updateNode(node *Node, eventType EventType) {
kd.nodesMu.Lock()
defer kd.nodesMu.Unlock()
updatedNodeName := node.ObjectMeta.Name
switch eventType {
case Deleted:
// Deleted - remove from nodes map.
delete(kd.nodes, updatedNodeName)
case Added, Modified:
// Added/Modified - update the node in the nodes map.
kd.nodes[updatedNodeName] = node
}
}
func (kd *Discovery) getNodes() (map[string]*Node, string, error) {
res, err := kd.queryAPIServerPath(nodesURL)
if err != nil {
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
// & return error.
return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status)
}
var nodes NodeList
if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body))
}
nodeMap := map[string]*Node{}
for idx, node := range nodes.Items {
nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx]
}
return nodeMap, nodes.ResourceVersion, nil
}
func (kd *Discovery) getServices() (map[string]map[string]*Service, string, error) {
res, err := kd.queryAPIServerPath(servicesURL)
if err != nil {
// If we can't list services then we can't watch them. Assume this is a misconfiguration
// & return error.
return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status)
}
var services ServiceList
if err := json.NewDecoder(res.Body).Decode(&services); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body))
}
serviceMap := map[string]map[string]*Service{}
for idx, service := range services.Items {
namespace, ok := serviceMap[service.ObjectMeta.Namespace]
if !ok {
namespace = map[string]*Service{}
serviceMap[service.ObjectMeta.Namespace] = namespace
}
namespace[service.ObjectMeta.Name] = &services.Items[idx]
}
return serviceMap, services.ResourceVersion, nil
}
// watchNodes watches nodes as they come & go.
func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
nodes, resourceVersion, err := kd.getNodes()
if err != nil {
log.Errorf("Cannot initialize nodes collection: %s", err)
return
}
// Reset the known nodes.
kd.nodesMu.Lock()
kd.nodes = map[string]*Node{}
kd.nodesMu.Unlock()
for _, node := range nodes {
events <- &nodeEvent{Added, node}
}
req, err := http.NewRequest("GET", nodesURL, nil)
if err != nil {
log.Errorf("Cannot create nodes request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch nodes: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch nodes: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event nodeEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch nodes unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
// watchServices watches services as they come & go.
func (kd *Discovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
// We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted
// in Kubernetes while we couldn't connect - small chance of this, but worth dealing with.
existingServices := kd.services
// Reset the known services.
kd.servicesMu.Lock()
kd.services = map[string]map[string]*Service{}
kd.servicesMu.Unlock()
services, resourceVersion, err := kd.getServices()
if err != nil {
log.Errorf("Cannot initialize services collection: %s", err)
return
}
// Now let's loop through the old services & see if they still exist in here
for oldNSName, oldNS := range existingServices {
if ns, ok := services[oldNSName]; !ok {
for _, service := range existingServices[oldNSName] {
events <- &serviceEvent{Deleted, service}
}
} else {
for oldServiceName, oldService := range oldNS {
if _, ok := ns[oldServiceName]; !ok {
events <- &serviceEvent{Deleted, oldService}
}
}
}
}
// Discard the existing services map for GC.
existingServices = nil
for _, ns := range services {
for _, service := range ns {
events <- &serviceEvent{Added, service}
}
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
kd.watchServices(resourceVersion, events, done)
wg.Done()
}()
go func() {
kd.watchServiceEndpoints(resourceVersion, events, done)
wg.Done()
}()
wg.Wait()
}, retryInterval, done)
}
func (kd *Discovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
req, err := http.NewRequest("GET", servicesURL, nil)
if err != nil {
log.Errorf("Failed to create services request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch services: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch services: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event serviceEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch services unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
return
}
}
}
// watchServiceEndpoints watches service endpoints as they come & go.
func (kd *Discovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
req, err := http.NewRequest("GET", endpointsURL, nil)
if err != nil {
log.Errorf("Failed to create service endpoints request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch service endpoints: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch service endpoints: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event endpointsEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch service endpoints unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}
func (kd *Discovery) updateService(service *Service, eventType EventType) *config.TargetGroup {
kd.servicesMu.Lock()
defer kd.servicesMu.Unlock()
switch eventType {
case Deleted:
return kd.deleteService(service)
case Added, Modified:
return kd.addService(service)
}
return nil
}
func (kd *Discovery) deleteService(service *Service) *config.TargetGroup {
tg := &config.TargetGroup{Source: serviceSource(service)}
delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name)
if len(kd.services[service.ObjectMeta.Namespace]) == 0 {
delete(kd.services, service.ObjectMeta.Namespace)
}
return tg
}
func (kd *Discovery) addService(service *Service) *config.TargetGroup {
namespace, ok := kd.services[service.ObjectMeta.Namespace]
if !ok {
namespace = map[string]*Service{}
kd.services[service.ObjectMeta.Namespace] = namespace
}
namespace[service.ObjectMeta.Name] = service
endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name)
res, err := kd.queryAPIServerPath(endpointURL)
if err != nil {
log.Errorf("Error getting service endpoints: %s", err)
return nil
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to get service endpoints: %d", res.StatusCode)
return nil
}
var eps Endpoints
if err := json.NewDecoder(res.Body).Decode(&eps); err != nil {
log.Errorf("Error getting service endpoints: %s", err)
return nil
}
return kd.updateServiceTargetGroup(service, &eps)
}
func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup {
tg := &config.TargetGroup{
Source: serviceSource(service),
Labels: model.LabelSet{
serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace),
serviceNameLabel: model.LabelValue(service.ObjectMeta.Name),
},
}
for k, v := range service.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k)
tg.Labels[model.LabelName(labelName)] = model.LabelValue(v)
}
for k, v := range service.ObjectMeta.Annotations {
labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k)
tg.Labels[model.LabelName(labelName)] = model.LabelValue(v)
}
serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc"
// Append the first TCP service port if one exists.
for _, port := range service.Spec.Ports {
if port.Protocol == ProtocolTCP {
serviceAddress += fmt.Sprintf(":%d", port.Port)
break
}
}
t := model.LabelSet{
model.AddressLabel: model.LabelValue(serviceAddress),
roleLabel: model.LabelValue("service"),
}
tg.Targets = append(tg.Targets, t)
// Now let's loop through the endpoints & add them to the target group with appropriate labels.
for _, ss := range eps.Subsets {
epPort := ss.Ports[0].Port
for _, addr := range ss.Addresses {
ipAddr := addr.IP
if len(ipAddr) == net.IPv6len {
ipAddr = "[" + ipAddr + "]"
}
address := fmt.Sprintf("%s:%d", ipAddr, epPort)
t := model.LabelSet{
model.AddressLabel: model.LabelValue(address),
roleLabel: model.LabelValue("endpoint"),
}
tg.Targets = append(tg.Targets, t)
}
}
return tg
}
func (kd *Discovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup {
kd.servicesMu.Lock()
defer kd.servicesMu.Unlock()
serviceNamespace := endpoints.ObjectMeta.Namespace
serviceName := endpoints.ObjectMeta.Name
if service, ok := kd.services[serviceNamespace][serviceName]; ok {
return kd.updateServiceTargetGroup(service, endpoints)
}
return nil
}
func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) {
bearerTokenFile := conf.BearerTokenFile
caFile := conf.TLSConfig.CAFile
@ -773,10 +279,6 @@ func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, err
}, nil
}
func serviceSource(service *Service) string {
return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name
}
// Until loops until stop channel is closed, running f every period.
// f may not be invoked if stop channel is already closed.
func until(f func(), period time.Duration, stopCh <-chan struct{}) {
@ -795,272 +297,3 @@ func until(f func(), period time.Duration, stopCh <-chan struct{}) {
}
}
}
// nodeAddresses returns the provided node's address, based on the priority:
// 1. NodeInternalIP
// 2. NodeExternalIP
// 3. NodeLegacyHostIP
//
// Copied from k8s.io/kubernetes/pkg/util/node/node.go
func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) {
addresses := node.Status.Addresses
addressMap := map[NodeAddressType][]net.IP{}
for _, addr := range addresses {
ip := net.ParseIP(addr.Address)
// All addresses should be valid IPs.
if ip == nil {
continue
}
addressMap[addr.Type] = append(addressMap[addr.Type], ip)
}
if addresses, ok := addressMap[NodeInternalIP]; ok {
return addresses[0], addressMap, nil
}
if addresses, ok := addressMap[NodeExternalIP]; ok {
return addresses[0], addressMap, nil
}
if addresses, ok := addressMap[NodeLegacyHostIP]; ok {
return addresses[0], addressMap, nil
}
return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses)
}
func (kd *Discovery) updatePod(pod *Pod, eventType EventType) {
kd.podsMu.Lock()
defer kd.podsMu.Unlock()
switch eventType {
case Deleted:
if _, ok := kd.pods[pod.ObjectMeta.Namespace]; ok {
delete(kd.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name)
if len(kd.pods[pod.ObjectMeta.Namespace]) == 0 {
delete(kd.pods, pod.ObjectMeta.Namespace)
}
}
case Added, Modified:
if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok {
kd.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod
}
}
func (kd *Discovery) getPods() (map[string]map[string]*Pod, string, error) {
res, err := kd.queryAPIServerPath(podsURL)
if err != nil {
return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status)
}
var pods PodList
if err := json.NewDecoder(res.Body).Decode(&pods); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body))
}
podMap := map[string]map[string]*Pod{}
for idx, pod := range pods.Items {
if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok {
podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace)
podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx]
}
return podMap, pods.ResourceVersion, nil
}
func (kd *Discovery) watchPods(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
pods, resourceVersion, err := kd.getPods()
if err != nil {
log.Errorf("Cannot initialize pods collection: %s", err)
return
}
kd.podsMu.Lock()
kd.pods = pods
kd.podsMu.Unlock()
req, err := http.NewRequest("GET", podsURL, nil)
if err != nil {
log.Errorf("Cannot create pods request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch pods: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch pods: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event podEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch pods unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func podSource(pod *Pod) string {
return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name
}
type ByContainerPort []ContainerPort
func (a ByContainerPort) Len() int { return len(a) }
func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort }
func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
type ByContainerName []Container
func (a ByContainerName) Len() int { return len(a) }
func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet {
var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers))
if pod.PodStatus.PodIP == "" {
log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name)
return targets
}
if pod.PodStatus.Phase != "Running" {
log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name)
return targets
}
ready := "unknown"
for _, cond := range pod.PodStatus.Conditions {
if strings.ToLower(cond.Type) == "ready" {
ready = strings.ToLower(cond.Status)
}
}
sort.Sort(ByContainerName(pod.PodSpec.Containers))
for _, container := range pod.PodSpec.Containers {
// Collect a list of TCP ports
// Sort by port number, ascending
// Product a target pointed at the first port
// Include a label containing all ports (portName=port,PortName=port,...,)
var tcpPorts []ContainerPort
var portLabel *bytes.Buffer = bytes.NewBufferString(",")
for _, port := range container.Ports {
if port.Protocol == "TCP" {
tcpPorts = append(tcpPorts, port)
}
}
if len(tcpPorts) == 0 {
log.Debugf("skipping container %s with no TCP ports", container.Name)
continue
}
sort.Sort(ByContainerPort(tcpPorts))
t := model.LabelSet{
model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))),
podNameLabel: model.LabelValue(pod.ObjectMeta.Name),
podAddressLabel: model.LabelValue(pod.PodStatus.PodIP),
podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace),
podContainerNameLabel: model.LabelValue(container.Name),
podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name),
podReadyLabel: model.LabelValue(ready),
}
for _, port := range tcpPorts {
portLabel.WriteString(port.Name)
portLabel.WriteString("=")
portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10))
portLabel.WriteString(",")
t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10))
}
t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String())
for k, v := range pod.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(podLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
for k, v := range pod.ObjectMeta.Annotations {
labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
targets = append(targets, t)
if !allContainers {
break
}
}
if len(targets) == 0 {
log.Debugf("no targets for pod %s", pod.ObjectMeta.Name)
}
return targets
}
func (kd *Discovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup {
kd.podsMu.RLock()
defer kd.podsMu.RUnlock()
tg := &config.TargetGroup{
Source: podSource(pod),
}
// If this pod doesn't exist, return an empty target group
if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok {
return tg
}
if _, ok := kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok {
return tg
}
tg.Labels = model.LabelSet{
roleLabel: model.LabelValue("container"),
}
tg.Targets = updatePodTargets(pod, true)
return tg
}
func (kd *Discovery) updatePodsTargetGroup() *config.TargetGroup {
tg := &config.TargetGroup{
Source: podsTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("pod"),
},
}
for _, namespace := range kd.pods {
for _, pod := range namespace {
tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...)
}
}
return tg
}

View file

@ -0,0 +1,242 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"sync"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
)
type nodeDiscovery struct {
mtx sync.RWMutex
nodes map[string]*Node
retryInterval time.Duration
kd *Discovery
}
func (d *nodeDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) {
select {
case ch <- []*config.TargetGroup{d.updateNodesTargetGroup()}:
case <-ctx.Done():
return
}
update := make(chan *nodeEvent, 10)
go d.watchNodes(update, ctx.Done(), d.retryInterval)
for {
tgs := []*config.TargetGroup{}
select {
case <-ctx.Done():
return
case e := <-update:
log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", e.EventType, e.Node.ObjectMeta.Name)
d.updateNode(e.Node, e.EventType)
tgs = append(tgs, d.updateNodesTargetGroup())
}
if tgs == nil {
continue
}
for _, tg := range tgs {
select {
case ch <- []*config.TargetGroup{tg}:
case <-ctx.Done():
return
}
}
}
}
func (d *nodeDiscovery) updateNodesTargetGroup() *config.TargetGroup {
d.mtx.RLock()
defer d.mtx.RUnlock()
tg := &config.TargetGroup{
Source: nodesTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("node"),
},
}
// Now let's loop through the nodes & add them to the target group with appropriate labels.
for nodeName, node := range d.nodes {
defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node)
if err != nil {
log.Debugf("Skipping node %s: %s", node.Name, err)
continue
}
kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
address := fmt.Sprintf("%s:%d", defaultNodeAddress.String(), kubeletPort)
t := model.LabelSet{
model.AddressLabel: model.LabelValue(address),
model.InstanceLabel: model.LabelValue(nodeName),
}
for addrType, ip := range nodeAddressMap {
labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType))
t[model.LabelName(labelName)] = model.LabelValue(ip[0].String())
}
t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort))
for k, v := range node.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
tg.Targets = append(tg.Targets, t)
}
return tg
}
// watchNodes watches nodes as they come & go.
func (d *nodeDiscovery) watchNodes(events chan *nodeEvent, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
nodes, resourceVersion, err := d.getNodes()
if err != nil {
log.Errorf("Cannot initialize nodes collection: %s", err)
return
}
// Reset the known nodes.
d.mtx.Lock()
d.nodes = map[string]*Node{}
d.mtx.Unlock()
for _, node := range nodes {
events <- &nodeEvent{Added, node}
}
req, err := http.NewRequest("GET", nodesURL, nil)
if err != nil {
log.Errorf("Cannot create nodes request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch nodes: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch nodes: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event nodeEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch nodes unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func (d *nodeDiscovery) updateNode(node *Node, eventType EventType) {
d.mtx.Lock()
defer d.mtx.Unlock()
updatedNodeName := node.ObjectMeta.Name
switch eventType {
case Deleted:
// Deleted - remove from nodes map.
delete(d.nodes, updatedNodeName)
case Added, Modified:
// Added/Modified - update the node in the nodes map.
d.nodes[updatedNodeName] = node
}
}
func (d *nodeDiscovery) getNodes() (map[string]*Node, string, error) {
res, err := d.kd.queryAPIServerPath(nodesURL)
if err != nil {
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
// & return error.
return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status)
}
var nodes NodeList
if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body))
}
nodeMap := map[string]*Node{}
for idx, node := range nodes.Items {
nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx]
}
return nodeMap, nodes.ResourceVersion, nil
}
// nodeAddresses returns the provided node's address, based on the priority:
// 1. NodeInternalIP
// 2. NodeExternalIP
// 3. NodeLegacyHostIP
//
// Copied from k8s.io/kubernetes/pkg/util/node/node.go
func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) {
addresses := node.Status.Addresses
addressMap := map[NodeAddressType][]net.IP{}
for _, addr := range addresses {
ip := net.ParseIP(addr.Address)
// All addresses should be valid IPs.
if ip == nil {
continue
}
addressMap[addr.Type] = append(addressMap[addr.Type], ip)
}
if addresses, ok := addressMap[NodeInternalIP]; ok {
return addresses[0], addressMap, nil
}
if addresses, ok := addressMap[NodeExternalIP]; ok {
return addresses[0], addressMap, nil
}
if addresses, ok := addressMap[NodeLegacyHostIP]; ok {
return addresses[0], addressMap, nil
}
return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses)
}

View file

@ -0,0 +1,342 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
)
type podDiscovery struct {
mtx sync.RWMutex
pods map[string]map[string]*Pod
retryInterval time.Duration
kd *Discovery
}
func (d *podDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) {
pods, _, err := d.getPods()
if err != nil {
log.Errorf("Cannot initialize pods collection: %s", err)
return
}
d.pods = pods
initial := []*config.TargetGroup{}
switch d.kd.Conf.Role {
case config.KubernetesRolePod:
initial = append(initial, d.updatePodsTargetGroup())
case config.KubernetesRoleContainer:
for _, ns := range d.pods {
for _, pod := range ns {
initial = append(initial, d.updateContainerTargetGroup(pod))
}
}
}
select {
case ch <- initial:
case <-ctx.Done():
return
}
update := make(chan *podEvent, 10)
go d.watchPods(update, ctx.Done(), d.retryInterval)
for {
tgs := []*config.TargetGroup{}
select {
case <-ctx.Done():
return
case e := <-update:
log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", e.EventType, e.Pod.ObjectMeta.Name)
d.updatePod(e.Pod, e.EventType)
switch d.kd.Conf.Role {
case config.KubernetesRoleContainer:
// Update the per-pod target group
tgs = append(tgs, d.updateContainerTargetGroup(e.Pod))
case config.KubernetesRolePod:
// Update the all pods target group
tgs = append(tgs, d.updatePodsTargetGroup())
}
}
if tgs == nil {
continue
}
for _, tg := range tgs {
select {
case ch <- []*config.TargetGroup{tg}:
case <-ctx.Done():
return
}
}
}
}
func (d *podDiscovery) getPods() (map[string]map[string]*Pod, string, error) {
res, err := d.kd.queryAPIServerPath(podsURL)
if err != nil {
return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status)
}
var pods PodList
if err := json.NewDecoder(res.Body).Decode(&pods); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body))
}
podMap := map[string]map[string]*Pod{}
for idx, pod := range pods.Items {
if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok {
podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace)
podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx]
}
return podMap, pods.ResourceVersion, nil
}
func (d *podDiscovery) watchPods(events chan *podEvent, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
pods, resourceVersion, err := d.getPods()
if err != nil {
log.Errorf("Cannot initialize pods collection: %s", err)
return
}
d.mtx.Lock()
d.pods = pods
d.mtx.Unlock()
req, err := http.NewRequest("GET", podsURL, nil)
if err != nil {
log.Errorf("Cannot create pods request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch pods: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch pods: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event podEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch pods unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func (d *podDiscovery) updatePod(pod *Pod, eventType EventType) {
d.mtx.Lock()
defer d.mtx.Unlock()
switch eventType {
case Deleted:
if _, ok := d.pods[pod.ObjectMeta.Namespace]; ok {
delete(d.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name)
if len(d.pods[pod.ObjectMeta.Namespace]) == 0 {
delete(d.pods, pod.ObjectMeta.Namespace)
}
}
case Added, Modified:
if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok {
d.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod
}
}
func (d *podDiscovery) updateContainerTargetGroup(pod *Pod) *config.TargetGroup {
d.mtx.RLock()
defer d.mtx.RUnlock()
tg := &config.TargetGroup{
Source: podSource(pod),
}
// If this pod doesn't exist, return an empty target group
if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok {
return tg
}
if _, ok := d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok {
return tg
}
tg.Labels = model.LabelSet{
roleLabel: model.LabelValue("container"),
}
tg.Targets = updatePodTargets(pod, true)
return tg
}
func (d *podDiscovery) updatePodsTargetGroup() *config.TargetGroup {
tg := &config.TargetGroup{
Source: podsTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("pod"),
},
}
for _, namespace := range d.pods {
for _, pod := range namespace {
tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...)
}
}
return tg
}
func podSource(pod *Pod) string {
return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name
}
func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet {
var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers))
if pod.PodStatus.PodIP == "" {
log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name)
return targets
}
if pod.PodStatus.Phase != "Running" {
log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name)
return targets
}
ready := "unknown"
for _, cond := range pod.PodStatus.Conditions {
if strings.ToLower(cond.Type) == "ready" {
ready = strings.ToLower(cond.Status)
}
}
sort.Sort(ByContainerName(pod.PodSpec.Containers))
for _, container := range pod.PodSpec.Containers {
// Collect a list of TCP ports
// Sort by port number, ascending
// Product a target pointed at the first port
// Include a label containing all ports (portName=port,PortName=port,...,)
var tcpPorts []ContainerPort
var portLabel *bytes.Buffer = bytes.NewBufferString(",")
for _, port := range container.Ports {
if port.Protocol == "TCP" {
tcpPorts = append(tcpPorts, port)
}
}
if len(tcpPorts) == 0 {
log.Debugf("skipping container %s with no TCP ports", container.Name)
continue
}
sort.Sort(ByContainerPort(tcpPorts))
t := model.LabelSet{
model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))),
podNameLabel: model.LabelValue(pod.ObjectMeta.Name),
podAddressLabel: model.LabelValue(pod.PodStatus.PodIP),
podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace),
podContainerNameLabel: model.LabelValue(container.Name),
podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name),
podReadyLabel: model.LabelValue(ready),
}
for _, port := range tcpPorts {
portLabel.WriteString(port.Name)
portLabel.WriteString("=")
portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10))
portLabel.WriteString(",")
t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10))
}
t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String())
for k, v := range pod.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(podLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
for k, v := range pod.ObjectMeta.Annotations {
labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
targets = append(targets, t)
if !allContainers {
break
}
}
if len(targets) == 0 {
log.Debugf("no targets for pod %s", pod.ObjectMeta.Name)
}
return targets
}
type ByContainerPort []ContainerPort
func (a ByContainerPort) Len() int { return len(a) }
func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort }
func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
type ByContainerName []Container
func (a ByContainerName) Len() int { return len(a) }
func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View file

@ -0,0 +1,370 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"sync"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
)
type serviceDiscovery struct {
mtx sync.RWMutex
services map[string]map[string]*Service
retryInterval time.Duration
kd *Discovery
}
func (d *serviceDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) {
update := make(chan interface{}, 10)
go d.startServiceWatch(update, ctx.Done(), d.retryInterval)
for {
tgs := []*config.TargetGroup{}
select {
case <-ctx.Done():
return
case event := <-update:
switch e := event.(type) {
case *endpointsEvent:
log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", e.EventType, e.Endpoints.ObjectMeta.Name)
tgs = append(tgs, d.updateServiceEndpoints(e.Endpoints, e.EventType))
case *serviceEvent:
log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", e.EventType, e.Service.ObjectMeta.Name)
tgs = append(tgs, d.updateService(e.Service, e.EventType))
}
}
if tgs == nil {
continue
}
for _, tg := range tgs {
select {
case ch <- []*config.TargetGroup{tg}:
case <-ctx.Done():
return
}
}
}
}
func (d *serviceDiscovery) getServices() (map[string]map[string]*Service, string, error) {
res, err := d.kd.queryAPIServerPath(servicesURL)
if err != nil {
// If we can't list services then we can't watch them. Assume this is a misconfiguration
// & return error.
return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status)
}
var services ServiceList
if err := json.NewDecoder(res.Body).Decode(&services); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body))
}
serviceMap := map[string]map[string]*Service{}
for idx, service := range services.Items {
namespace, ok := serviceMap[service.ObjectMeta.Namespace]
if !ok {
namespace = map[string]*Service{}
serviceMap[service.ObjectMeta.Namespace] = namespace
}
namespace[service.ObjectMeta.Name] = &services.Items[idx]
}
return serviceMap, services.ResourceVersion, nil
}
// watchServices watches services as they come & go.
func (d *serviceDiscovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
// We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted
// in Kubernetes while we couldn't connect - small chance of this, but worth dealing with.
d.mtx.Lock()
existingServices := d.services
// Reset the known services.
d.services = map[string]map[string]*Service{}
d.mtx.Unlock()
services, resourceVersion, err := d.getServices()
if err != nil {
log.Errorf("Cannot initialize services collection: %s", err)
return
}
// Now let's loop through the old services & see if they still exist in here
for oldNSName, oldNS := range existingServices {
if ns, ok := services[oldNSName]; !ok {
for _, service := range existingServices[oldNSName] {
events <- &serviceEvent{Deleted, service}
}
} else {
for oldServiceName, oldService := range oldNS {
if _, ok := ns[oldServiceName]; !ok {
events <- &serviceEvent{Deleted, oldService}
}
}
}
}
// Discard the existing services map for GC.
existingServices = nil
for _, ns := range services {
for _, service := range ns {
events <- &serviceEvent{Added, service}
}
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
d.watchServices(resourceVersion, events, done)
wg.Done()
}()
go func() {
d.watchServiceEndpoints(resourceVersion, events, done)
wg.Done()
}()
wg.Wait()
}, retryInterval, done)
}
func (d *serviceDiscovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
req, err := http.NewRequest("GET", servicesURL, nil)
if err != nil {
log.Errorf("Failed to create services request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch services: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch services: %d", res.StatusCode)
return
}
dec := json.NewDecoder(res.Body)
for {
var event serviceEvent
if err := dec.Decode(&event); err != nil {
log.Errorf("Watch services unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
return
}
}
}
// watchServiceEndpoints watches service endpoints as they come & go.
func (d *serviceDiscovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
req, err := http.NewRequest("GET", endpointsURL, nil)
if err != nil {
log.Errorf("Failed to create service endpoints request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch service endpoints: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch service endpoints: %d", res.StatusCode)
return
}
dec := json.NewDecoder(res.Body)
for {
var event endpointsEvent
if err := dec.Decode(&event); err != nil {
log.Errorf("Watch service endpoints unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}
func (d *serviceDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup {
d.mtx.Lock()
defer d.mtx.Unlock()
switch eventType {
case Deleted:
return d.deleteService(service)
case Added, Modified:
return d.addService(service)
}
return nil
}
func (d *serviceDiscovery) deleteService(service *Service) *config.TargetGroup {
tg := &config.TargetGroup{Source: serviceSource(service)}
delete(d.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name)
if len(d.services[service.ObjectMeta.Namespace]) == 0 {
delete(d.services, service.ObjectMeta.Namespace)
}
return tg
}
func (d *serviceDiscovery) addService(service *Service) *config.TargetGroup {
namespace, ok := d.services[service.ObjectMeta.Namespace]
if !ok {
namespace = map[string]*Service{}
d.services[service.ObjectMeta.Namespace] = namespace
}
namespace[service.ObjectMeta.Name] = service
endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name)
res, err := d.kd.queryAPIServerPath(endpointURL)
if err != nil {
log.Errorf("Error getting service endpoints: %s", err)
return nil
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to get service endpoints: %d", res.StatusCode)
return nil
}
var eps Endpoints
if err := json.NewDecoder(res.Body).Decode(&eps); err != nil {
log.Errorf("Error getting service endpoints: %s", err)
return nil
}
return d.updateServiceTargetGroup(service, &eps)
}
func (d *serviceDiscovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup {
tg := &config.TargetGroup{
Source: serviceSource(service),
Labels: model.LabelSet{
serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace),
serviceNameLabel: model.LabelValue(service.ObjectMeta.Name),
},
}
for k, v := range service.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k)
tg.Labels[model.LabelName(labelName)] = model.LabelValue(v)
}
for k, v := range service.ObjectMeta.Annotations {
labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k)
tg.Labels[model.LabelName(labelName)] = model.LabelValue(v)
}
serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc"
// Append the first TCP service port if one exists.
for _, port := range service.Spec.Ports {
if port.Protocol == ProtocolTCP {
serviceAddress += fmt.Sprintf(":%d", port.Port)
break
}
}
switch d.kd.Conf.Role {
case config.KubernetesRoleService:
t := model.LabelSet{
model.AddressLabel: model.LabelValue(serviceAddress),
roleLabel: model.LabelValue("service"),
}
tg.Targets = append(tg.Targets, t)
case config.KubernetesRoleEndpoint:
// Now let's loop through the endpoints & add them to the target group
// with appropriate labels.
for _, ss := range eps.Subsets {
epPort := ss.Ports[0].Port
for _, addr := range ss.Addresses {
ipAddr := addr.IP
if len(ipAddr) == net.IPv6len {
ipAddr = "[" + ipAddr + "]"
}
address := fmt.Sprintf("%s:%d", ipAddr, epPort)
t := model.LabelSet{
model.AddressLabel: model.LabelValue(address),
roleLabel: model.LabelValue("endpoint"),
}
tg.Targets = append(tg.Targets, t)
}
}
}
return tg
}
func (d *serviceDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup {
d.mtx.Lock()
defer d.mtx.Unlock()
serviceNamespace := endpoints.ObjectMeta.Namespace
serviceName := endpoints.ObjectMeta.Name
if service, ok := d.services[serviceNamespace][serviceName]; ok {
return d.updateServiceTargetGroup(service, endpoints)
}
return nil
}
func serviceSource(service *Service) string {
return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name
}

View file

@ -1,30 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"time"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
)
// NewMarathon creates a new Marathon based discovery.
func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery {
return &marathon.Discovery{
Servers: conf.Servers,
RefreshInterval: time.Duration(conf.RefreshInterval),
Client: marathon.FetchApps,
}
}