kubernetes: merge back into single configuration

This commit is contained in:
Fabian Reinartz 2016-10-07 14:53:11 +02:00
parent a9cfb66b28
commit b24602f713
14 changed files with 312 additions and 2160 deletions

View file

@ -127,10 +127,7 @@ var (
} }
// DefaultKubernetesSDConfig is the default Kubernetes SD configuration // DefaultKubernetesSDConfig is the default Kubernetes SD configuration
DefaultKubernetesSDConfig = KubernetesSDConfig{ DefaultKubernetesSDConfig = KubernetesSDConfig{}
RequestTimeout: model.Duration(10 * time.Second),
RetryInterval: model.Duration(1 * time.Second),
}
// DefaultGCESDConfig is the default EC2 SD configuration. // DefaultGCESDConfig is the default EC2 SD configuration.
DefaultGCESDConfig = GCESDConfig{ DefaultGCESDConfig = GCESDConfig{
@ -442,8 +439,6 @@ type ScrapeConfig struct {
MarathonSDConfigs []*MarathonSDConfig `yaml:"marathon_sd_configs,omitempty"` MarathonSDConfigs []*MarathonSDConfig `yaml:"marathon_sd_configs,omitempty"`
// List of Kubernetes service discovery configurations. // List of Kubernetes service discovery configurations.
KubernetesSDConfigs []*KubernetesSDConfig `yaml:"kubernetes_sd_configs,omitempty"` KubernetesSDConfigs []*KubernetesSDConfig `yaml:"kubernetes_sd_configs,omitempty"`
// List of Kubernetes service discovery configurations.
KubernetesV2SDConfigs []*KubernetesV2SDConfig `yaml:"kubernetes_v2_sd_configs,omitempty"`
// List of GCE service discovery configurations. // List of GCE service discovery configurations.
GCESDConfigs []*GCESDConfig `yaml:"gce_sd_configs,omitempty"` GCESDConfigs []*GCESDConfig `yaml:"gce_sd_configs,omitempty"`
// List of EC2 service discovery configurations. // List of EC2 service discovery configurations.
@ -798,31 +793,13 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro
return nil return nil
} }
// KubernetesSDConfig is the configuration for Kubernetes service discovery.
type KubernetesSDConfig struct {
APIServers []URL `yaml:"api_servers"`
Role KubernetesRole `yaml:"role"`
InCluster bool `yaml:"in_cluster,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
RetryInterval model.Duration `yaml:"retry_interval,omitempty"`
RequestTimeout model.Duration `yaml:"request_timeout,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
type KubernetesRole string type KubernetesRole string
const ( const (
KubernetesRoleNode = "node" KubernetesRoleNode = "node"
KubernetesRolePod = "pod" KubernetesRolePod = "pod"
KubernetesRoleContainer = "container" KubernetesRoleService = "service"
KubernetesRoleService = "service" KubernetesRoleEndpoint = "endpoint"
KubernetesRoleEndpoint = "endpoint"
KubernetesRoleAPIServer = "apiserver"
) )
func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error { func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error {
@ -830,41 +807,15 @@ func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error
return err return err
} }
switch *c { switch *c {
case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleContainer, KubernetesRoleService, KubernetesRoleEndpoint, KubernetesRoleAPIServer: case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleService, KubernetesRoleEndpoint:
return nil return nil
default: default:
return fmt.Errorf("Unknown Kubernetes SD role %q", *c) return fmt.Errorf("Unknown Kubernetes SD role %q", *c)
} }
} }
// UnmarshalYAML implements the yaml.Unmarshaler interface. // KubernetesSDConfig is the configuration for Kubernetes service discovery.
func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { type KubernetesSDConfig struct {
*c = DefaultKubernetesSDConfig
type plain KubernetesSDConfig
err := unmarshal((*plain)(c))
if err != nil {
return err
}
if err := checkOverflow(c.XXX, "kubernetes_sd_config"); err != nil {
return err
}
if c.Role == "" {
return fmt.Errorf("role missing (one of: container, pod, service, endpoint, node, apiserver)")
}
if len(c.APIServers) == 0 {
return fmt.Errorf("Kubernetes SD configuration requires at least one Kubernetes API server")
}
if len(c.BearerToken) > 0 && len(c.BearerTokenFile) > 0 {
return fmt.Errorf("at most one of bearer_token & bearer_token_file must be configured")
}
if c.BasicAuth != nil && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) {
return fmt.Errorf("at most one of basic_auth, bearer_token & bearer_token_file must be configured")
}
return nil
}
// KubernetesV2SDConfig is the configuration for Kubernetes service discovery.
type KubernetesV2SDConfig struct {
APIServer URL `yaml:"api_server"` APIServer URL `yaml:"api_server"`
Role KubernetesRole `yaml:"role"` Role KubernetesRole `yaml:"role"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
@ -877,9 +828,9 @@ type KubernetesV2SDConfig struct {
} }
// UnmarshalYAML implements the yaml.Unmarshaler interface. // UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *KubernetesV2SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = KubernetesV2SDConfig{} *c = KubernetesSDConfig{}
type plain KubernetesV2SDConfig type plain KubernetesSDConfig
err := unmarshal((*plain)(c)) err := unmarshal((*plain)(c))
if err != nil { if err != nil {
return err return err
@ -888,7 +839,7 @@ func (c *KubernetesV2SDConfig) UnmarshalYAML(unmarshal func(interface{}) error)
return err return err
} }
if c.Role == "" { if c.Role == "" {
return fmt.Errorf("role missing (one of: container, pod, service, endpoint, node, apiserver)") return fmt.Errorf("role missing (one of: pod, service, endpoint, node)")
} }
if len(c.BearerToken) > 0 && len(c.BearerTokenFile) > 0 { if len(c.BearerToken) > 0 && len(c.BearerTokenFile) > 0 {
return fmt.Errorf("at most one of bearer_token & bearer_token_file must be configured") return fmt.Errorf("at most one of bearer_token & bearer_token_file must be configured")

View file

@ -21,7 +21,6 @@ import (
"github.com/prometheus/prometheus/retrieval/discovery/consul" "github.com/prometheus/prometheus/retrieval/discovery/consul"
"github.com/prometheus/prometheus/retrieval/discovery/dns" "github.com/prometheus/prometheus/retrieval/discovery/dns"
"github.com/prometheus/prometheus/retrieval/discovery/kubernetes" "github.com/prometheus/prometheus/retrieval/discovery/kubernetes"
"github.com/prometheus/prometheus/retrieval/discovery/kubernetes_v2"
"github.com/prometheus/prometheus/retrieval/discovery/marathon" "github.com/prometheus/prometheus/retrieval/discovery/marathon"
) )
@ -31,20 +30,8 @@ func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) {
} }
// NewKubernetesDiscovery creates a Kubernetes service discovery based on the passed-in configuration. // NewKubernetesDiscovery creates a Kubernetes service discovery based on the passed-in configuration.
func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discovery, error) { func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Kubernetes, error) {
kd := &kubernetes.Discovery{ return kubernetes.New(log.Base(), conf)
Conf: conf,
}
err := kd.Initialize()
if err != nil {
return nil, err
}
return kd, nil
}
// NewKubernetesV2Discovery creates a Kubernetes service discovery based on the passed-in configuration.
func NewKubernetesV2Discovery(conf *config.KubernetesV2SDConfig) (*kubernetesv2.Kubernetes, error) {
return kubernetesv2.New(log.Base(), conf)
} }
// NewMarathon creates a new Marathon based discovery. // NewMarathon creates a new Marathon based discovery.

View file

@ -1,296 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"sync"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil"
)
const (
// kubernetesMetaLabelPrefix is the meta prefix used for all meta labels.
// in this discovery.
metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_"
// roleLabel is the name for the label containing a target's role.
roleLabel = metaLabelPrefix + "role"
sourcePodPrefix = "pods"
// podsTargetGroupNAme is the name given to the target group for pods
podsTargetGroupName = "pods"
// podNamespaceLabel is the name for the label containing a target pod's namespace
podNamespaceLabel = metaLabelPrefix + "pod_namespace"
// podNameLabel is the name for the label containing a target pod's name
podNameLabel = metaLabelPrefix + "pod_name"
// podAddressLabel is the name for the label containing a target pod's IP address (the PodIP)
podAddressLabel = metaLabelPrefix + "pod_address"
// podContainerNameLabel is the name for the label containing a target's container name
podContainerNameLabel = metaLabelPrefix + "pod_container_name"
// podContainerPortNameLabel is the name for the label containing the name of the port selected for a target
podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name"
// PodContainerPortListLabel is the name for the label containing a list of all TCP ports on the target container
podContainerPortListLabel = metaLabelPrefix + "pod_container_port_list"
// PodContainerPortMapPrefix is the prefix used to create the names of labels that associate container port names to port values
// Such labels will be named (podContainerPortMapPrefix)_(PortName) = (ContainerPort)
podContainerPortMapPrefix = metaLabelPrefix + "pod_container_port_map_"
// podReadyLabel is the name for the label containing the 'Ready' status (true/false/unknown) for a target
podReadyLabel = metaLabelPrefix + "pod_ready"
// podLabelPrefix is the prefix for prom label names corresponding to k8s labels for a target pod
podLabelPrefix = metaLabelPrefix + "pod_label_"
// podAnnotationPrefix is the prefix for prom label names corresponding to k8s annotations for a target pod
podAnnotationPrefix = metaLabelPrefix + "pod_annotation_"
// podNodeLabel is the name for the label containing the name of the node that a pod is scheduled on to
podNodeNameLabel = metaLabelPrefix + "pod_node_name"
// podHostIPLabel is the name for the label containing the IP of the node that a pod is scheduled on to
podHostIPLabel = metaLabelPrefix + "pod_host_ip"
sourceServicePrefix = "services"
// serviceNamespaceLabel is the name for the label containing a target's service namespace.
serviceNamespaceLabel = metaLabelPrefix + "service_namespace"
// serviceNameLabel is the name for the label containing a target's service name.
serviceNameLabel = metaLabelPrefix + "service_name"
// serviceLabelPrefix is the prefix for the service labels.
serviceLabelPrefix = metaLabelPrefix + "service_label_"
// serviceAnnotationPrefix is the prefix for the service annotations.
serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_"
// nodesTargetGroupName is the name given to the target group for nodes.
nodesTargetGroupName = "nodes"
// nodeLabelPrefix is the prefix for the node labels.
nodeLabelPrefix = metaLabelPrefix + "node_label_"
// nodeAddressPrefix is the prefix for the node addresses.
nodeAddressPrefix = metaLabelPrefix + "node_address_"
// nodePortLabel is the name of the label for the node port.
nodePortLabel = metaLabelPrefix + "node_port"
// apiServersTargetGroupName is the name given to the target group for API servers.
apiServersTargetGroupName = "apiServers"
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
apiVersion = "v1"
apiPrefix = "/api/" + apiVersion
nodesURL = apiPrefix + "/nodes"
podsURL = apiPrefix + "/pods"
servicesURL = apiPrefix + "/services"
endpointsURL = apiPrefix + "/endpoints"
serviceEndpointsURL = apiPrefix + "/namespaces/%s/endpoints/%s"
)
// Discovery implements a TargetProvider for Kubernetes services.
type Discovery struct {
client *http.Client
Conf *config.KubernetesSDConfig
apiServers []config.URL
apiServersMu sync.RWMutex
}
// Initialize sets up the discovery for usage.
func (kd *Discovery) Initialize() error {
client, err := newKubernetesHTTPClient(kd.Conf)
if err != nil {
return err
}
kd.apiServers = kd.Conf.APIServers
kd.client = client
return nil
}
// Run implements the TargetProvider interface.
func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
log.Debugf("Start Kubernetes service discovery")
defer close(ch)
switch kd.Conf.Role {
case config.KubernetesRolePod, config.KubernetesRoleContainer:
pd := &podDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval),
kd: kd,
}
pd.run(ctx, ch)
case config.KubernetesRoleNode:
nd := &nodeDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval),
kd: kd,
}
nd.run(ctx, ch)
case config.KubernetesRoleService, config.KubernetesRoleEndpoint:
sd := &serviceDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval),
kd: kd,
}
sd.run(ctx, ch)
case config.KubernetesRoleAPIServer:
select {
case ch <- []*config.TargetGroup{kd.updateAPIServersTargetGroup()}:
case <-ctx.Done():
return
}
default:
log.Errorf("unknown Kubernetes discovery kind %q", kd.Conf.Role)
return
}
}
func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) {
req, err := http.NewRequest("GET", path, nil)
if err != nil {
return nil, err
}
return kd.queryAPIServerReq(req)
}
func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error) {
// Lock in case we need to rotate API servers to request.
kd.apiServersMu.Lock()
defer kd.apiServersMu.Unlock()
var lastErr error
for i := 0; i < len(kd.apiServers); i++ {
cloneReq := *req
cloneReq.URL.Host = kd.apiServers[0].Host
cloneReq.URL.Scheme = kd.apiServers[0].Scheme
res, err := kd.client.Do(&cloneReq)
if err == nil {
return res, nil
}
lastErr = err
kd.rotateAPIServers()
}
return nil, fmt.Errorf("unable to query any API servers: %v", lastErr)
}
func (kd *Discovery) rotateAPIServers() {
if len(kd.apiServers) > 1 {
kd.apiServers = append(kd.apiServers[1:], kd.apiServers[0])
}
}
func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup {
tg := &config.TargetGroup{
Source: apiServersTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("apiserver"),
},
}
for _, apiServer := range kd.apiServers {
apiServerAddress := apiServer.Host
_, _, err := net.SplitHostPort(apiServerAddress)
// If error then no port is specified - use default for scheme.
if err != nil {
switch apiServer.Scheme {
case "http":
apiServerAddress = net.JoinHostPort(apiServerAddress, "80")
case "https":
apiServerAddress = net.JoinHostPort(apiServerAddress, "443")
}
}
t := model.LabelSet{
model.AddressLabel: model.LabelValue(apiServerAddress),
model.SchemeLabel: model.LabelValue(apiServer.Scheme),
}
tg.Targets = append(tg.Targets, t)
}
return tg
}
func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) {
bearerTokenFile := conf.BearerTokenFile
tlsConfig := conf.TLSConfig
if conf.InCluster {
if len(bearerTokenFile) == 0 {
bearerTokenFile = serviceAccountToken
}
if len(tlsConfig.CAFile) == 0 {
// With recent versions, the CA certificate is mounted as a secret
// but we need to handle older versions too. In this case, don't
// set the CAFile & the configuration will have to use InsecureSkipVerify.
if _, err := os.Stat(serviceAccountCACert); err == nil {
tlsConfig.CAFile = serviceAccountCACert
}
}
}
tls, err := httputil.NewTLSConfig(tlsConfig)
if err != nil {
return nil, err
}
var rt http.RoundTripper = &http.Transport{
Dial: func(netw, addr string) (c net.Conn, err error) {
c, err = net.DialTimeout(netw, addr, time.Duration(conf.RequestTimeout))
return
},
TLSClientConfig: tls,
}
// If a bearer token is provided, create a round tripper that will set the
// Authorization header correctly on each request.
bearerToken := conf.BearerToken
if len(bearerToken) == 0 && len(bearerTokenFile) > 0 {
b, err := ioutil.ReadFile(bearerTokenFile)
if err != nil {
return nil, fmt.Errorf("unable to read bearer token file %s: %s", bearerTokenFile, err)
}
bearerToken = string(b)
}
if len(bearerToken) > 0 {
rt = httputil.NewBearerAuthRoundTripper(bearerToken, rt)
}
if conf.BasicAuth != nil {
rt = httputil.NewBasicAuthRoundTripper(conf.BasicAuth.Username, conf.BasicAuth.Password, rt)
}
return &http.Client{
Transport: rt,
}, nil
}
// Until loops until stop channel is closed, running f every period.
// f may not be invoked if stop channel is already closed.
func until(f func(), period time.Duration, stopCh <-chan struct{}) {
select {
case <-stopCh:
return
default:
f()
}
for {
select {
case <-stopCh:
return
case <-time.After(period):
f()
}
}
}

View file

@ -1,221 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"flag"
"math/rand"
"os"
"testing"
_ "github.com/prometheus/common/log"
"github.com/prometheus/common/model"
)
func TestMain(m *testing.M) {
flag.Parse()
os.Exit(m.Run())
}
var portsA = []ContainerPort{
{
Name: "http",
ContainerPort: 80,
Protocol: "TCP",
},
}
var portsB = []ContainerPort{
{
Name: "https",
ContainerPort: 443,
Protocol: "TCP",
},
}
var portsNoTcp = []ContainerPort{
{
Name: "dns",
ContainerPort: 53,
Protocol: "UDP",
},
}
var portsMultiA = []ContainerPort{
{
Name: "http",
ContainerPort: 80,
Protocol: "TCP",
},
{
Name: "ssh",
ContainerPort: 22,
Protocol: "TCP",
},
}
var portsMultiB = []ContainerPort{
{
Name: "http",
ContainerPort: 80,
Protocol: "TCP",
},
{
Name: "https",
ContainerPort: 443,
Protocol: "TCP",
},
}
func container(name string, ports []ContainerPort) Container {
p := make([]ContainerPort, len(ports))
copy(p, ports)
// Shuffle order of ports to ensure code enforces determinism
for i := range p {
j := rand.Intn(i + 1)
p[i], p[j] = p[j], p[i]
}
return Container{
Name: name,
Ports: p,
}
}
func pod(name string, containers []Container) *Pod {
c := make([]Container, len(containers))
copy(c, containers)
// Shuffle order of containers to ensure code enforces determinism
for i := range c {
j := rand.Intn(i + 1)
c[i], c[j] = c[j], c[i]
}
return &Pod{
ObjectMeta: ObjectMeta{
Name: name,
},
PodStatus: PodStatus{
PodIP: "1.1.1.1",
Phase: "Running",
Conditions: []PodCondition{
{
Type: "Ready",
Status: "True",
},
},
HostIP: "2.2.2.2",
},
PodSpec: PodSpec{
Containers: c,
NodeName: "test-node",
},
}
}
func TestUpdatePodTargets(t *testing.T) {
var result []model.LabelSet
// Multiple iterations help ensure that we'll see different permutations via the various randomizations that occur
for i := 0; i < 100; i++ {
// Return no targets for a pod that isn't "Running"
result = updatePodTargets(&Pod{PodStatus: PodStatus{PodIP: "1.1.1.1"}}, true)
if len(result) > 0 {
t.Fatalf("expected 0 targets, received %d", len(result))
}
// Return no targets for a pod with no IP
result = updatePodTargets(&Pod{PodStatus: PodStatus{Phase: "Running"}}, true)
if len(result) > 0 {
t.Fatalf("expected 0 targets, received %d", len(result))
}
// A pod with no containers (?!) should not produce any targets
result = updatePodTargets(pod("empty", []Container{}), true)
if len(result) > 0 {
t.Fatalf("expected 0 targets, received %d", len(result))
}
// Return no targets for a pod that has no HostIP
result = updatePodTargets(&Pod{PodStatus: PodStatus{PodIP: "1.1.1.1", Phase: "Running"}}, true)
if len(result) > 0 {
t.Fatalf("expected 0 targets, received %d", len(result))
}
// A pod with all valid containers should return one target per container with allContainers=true
result = updatePodTargets(pod("easy", []Container{container("a", portsA), container("b", portsB)}), true)
if len(result) != 2 {
t.Fatalf("expected 2 targets, received %d", len(result))
}
if result[0][podReadyLabel] != "true" {
t.Fatalf("expected result[0] podReadyLabel 'true', received '%s'", result[0][podReadyLabel])
}
if _, ok := result[0][podContainerPortMapPrefix+"http"]; !ok {
t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was missing")
}
if result[0][podContainerPortMapPrefix+"http"] != "80" {
t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was %s", result[0][podContainerPortMapPrefix+"http"])
}
if _, ok := result[1][podContainerPortMapPrefix+"https"]; !ok {
t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was missing")
}
if result[1][podContainerPortMapPrefix+"https"] != "443" {
t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was %s", result[1][podContainerPortMapPrefix+"https"])
}
if result[0][podNodeNameLabel] != "test-node" {
t.Fatalf("expected result[0] podNodeNameLabel 'test-node', received '%s'", result[0][podNodeNameLabel])
}
if result[0][podHostIPLabel] != "2.2.2.2" {
t.Fatalf("expected result[0] podHostIPLabel '2.2.2.2', received '%s'", result[0][podHostIPLabel])
}
// A pod with all valid containers should return one target with allContainers=false, and it should be the alphabetically first container
result = updatePodTargets(pod("easy", []Container{container("a", portsA), container("b", portsB)}), false)
if len(result) != 1 {
t.Fatalf("expected 1 targets, received %d", len(result))
}
if _, ok := result[0][podContainerNameLabel]; !ok {
t.Fatalf("expected result[0][podContainerNameLabel] to be 'a', but was missing")
}
if result[0][podContainerNameLabel] != "a" {
t.Fatalf("expected result[0][podContainerNameLabel] to be 'a', but was '%s'", result[0][podContainerNameLabel])
}
// A pod with some non-targetable containers should return one target per targetable container with allContainers=true
result = updatePodTargets(pod("mixed", []Container{container("a", portsA), container("no-tcp", portsNoTcp), container("b", portsB)}), true)
if len(result) != 2 {
t.Fatalf("expected 2 targets, received %d", len(result))
}
// A pod with a container with multiple ports should return the numerically smallest port
result = updatePodTargets(pod("hard", []Container{container("multiA", portsMultiA), container("multiB", portsMultiB)}), true)
if len(result) != 2 {
t.Fatalf("expected 2 targets, received %d", len(result))
}
if result[0][model.AddressLabel] != "1.1.1.1:22" {
t.Fatalf("expected result[0] address to be 1.1.1.1:22, received %s", result[0][model.AddressLabel])
}
if result[0][podContainerPortListLabel] != ",ssh=22,http=80," {
t.Fatalf("expected result[0] podContainerPortListLabel to be ',ssh=22,http=80,', received '%s'", result[0][podContainerPortListLabel])
}
if result[1][model.AddressLabel] != "1.1.1.1:80" {
t.Fatalf("expected result[1] address to be 1.1.1.1:80, received %s", result[1][model.AddressLabel])
}
if result[1][podContainerPortListLabel] != ",http=80,https=443," {
t.Fatalf("expected result[1] podContainerPortListLabel to be ',http=80,https=443,', received '%s'", result[1][podContainerPortListLabel])
}
}
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package kubernetesv2 package kubernetes
import ( import (
"fmt" "fmt"

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package kubernetesv2 package kubernetes
import ( import (
"io/ioutil" "io/ioutil"
@ -52,12 +52,12 @@ func init() {
} }
// New creates a new Kubernetes discovery for the given role. // New creates a new Kubernetes discovery for the given role.
func New(l log.Logger, conf *config.KubernetesV2SDConfig) (*Kubernetes, error) { func New(l log.Logger, conf *config.KubernetesSDConfig) (*Kubernetes, error) {
var ( var (
kcfg *rest.Config kcfg *rest.Config
err error err error
) )
if conf.APIServer.String() == "" { if conf.APIServer.URL == nil {
kcfg, err = rest.InClusterConfig() kcfg, err = rest.InClusterConfig()
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -14,229 +14,148 @@
package kubernetes package kubernetes
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"net" "net"
"net/http"
"strconv" "strconv"
"sync"
"time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/client-go/1.5/pkg/api"
apiv1 "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache"
) )
type nodeDiscovery struct { // Node discovers Kubernetes nodes.
mtx sync.RWMutex type Node struct {
nodes map[string]*Node logger log.Logger
retryInterval time.Duration informer cache.SharedInformer
kd *Discovery store cache.Store
} }
func (d *nodeDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { // NewNode returns a new node discovery.
func NewNode(l log.Logger, inf cache.SharedInformer) *Node {
return &Node{logger: l, informer: inf, store: inf.GetStore()}
}
// Run implements the TargetProvider interface.
func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send full initial set of pod targets.
var initial []*config.TargetGroup
for _, o := range n.store.List() {
tg := n.buildNode(o.(*apiv1.Node))
initial = append(initial, tg)
}
select { select {
case ch <- []*config.TargetGroup{d.updateNodesTargetGroup()}:
case <-ctx.Done(): case <-ctx.Done():
return return
case ch <- initial:
} }
update := make(chan *nodeEvent, 10) // Send target groups for service updates.
go d.watchNodes(update, ctx.Done(), d.retryInterval) send := func(tg *config.TargetGroup) {
for {
tgs := []*config.TargetGroup{}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return case ch <- []*config.TargetGroup{tg}:
case e := <-update:
log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", e.EventType, e.Node.ObjectMeta.Name)
d.updateNode(e.Node, e.EventType)
tgs = append(tgs, d.updateNodesTargetGroup())
}
if tgs == nil {
continue
}
for _, tg := range tgs {
select {
case ch <- []*config.TargetGroup{tg}:
case <-ctx.Done():
return
}
} }
} }
n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
send(n.buildNode(o.(*apiv1.Node)))
},
DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: nodeSource(o.(*apiv1.Node))})
},
UpdateFunc: func(_, o interface{}) {
send(n.buildNode(o.(*apiv1.Node)))
},
})
// Block until the target provider is explicitly canceled.
<-ctx.Done()
} }
func (d *nodeDiscovery) updateNodesTargetGroup() *config.TargetGroup { func nodeSource(n *apiv1.Node) string {
d.mtx.RLock() return "node/" + n.Namespace + "/" + n.Name
defer d.mtx.RUnlock() }
const (
nodeNameLabel = metaLabelPrefix + "node_name"
nodeLabelPrefix = metaLabelPrefix + "node_label_"
nodeAnnotationPrefix = metaLabelPrefix + "node_annotation_"
nodeAddressPrefix = metaLabelPrefix + "node_address_"
)
func nodeLabels(n *apiv1.Node) model.LabelSet {
ls := make(model.LabelSet, len(n.Labels)+len(n.Annotations)+2)
ls[nodeNameLabel] = lv(n.Name)
for k, v := range n.Labels {
ln := strutil.SanitizeLabelName(nodeLabelPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
for k, v := range n.Annotations {
ln := strutil.SanitizeLabelName(nodeAnnotationPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
return ls
}
func (n *Node) buildNode(node *apiv1.Node) *config.TargetGroup {
tg := &config.TargetGroup{ tg := &config.TargetGroup{
Source: nodesTargetGroupName, Source: nodeSource(node),
Labels: model.LabelSet{ }
roleLabel: model.LabelValue("node"), tg.Labels = nodeLabels(node)
},
addr, addrMap, err := nodeAddress(node)
if err != nil {
n.logger.With("err", err).Debugf("No node address found")
return nil
}
addr = net.JoinHostPort(addr, strconv.FormatInt(int64(node.Status.DaemonEndpoints.KubeletEndpoint.Port), 10))
t := model.LabelSet{
model.AddressLabel: lv(addr),
model.InstanceLabel: lv(node.Name),
} }
// Now let's loop through the nodes & add them to the target group with appropriate labels. for ty, a := range addrMap {
for nodeName, node := range d.nodes { ln := strutil.SanitizeLabelName(nodeAddressPrefix + string(ty))
defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node) t[model.LabelName(ln)] = lv(a[0])
if err != nil {
log.Debugf("Skipping node %s: %s", node.Name, err)
continue
}
kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
address := net.JoinHostPort(defaultNodeAddress.String(), fmt.Sprintf("%d", kubeletPort))
t := model.LabelSet{
model.AddressLabel: model.LabelValue(address),
model.InstanceLabel: model.LabelValue(nodeName),
}
for addrType, ip := range nodeAddressMap {
labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType))
t[model.LabelName(labelName)] = model.LabelValue(ip[0].String())
}
t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort))
for k, v := range node.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
tg.Targets = append(tg.Targets, t)
} }
tg.Targets = append(tg.Targets, t)
return tg return tg
} }
// watchNodes watches nodes as they come & go.
func (d *nodeDiscovery) watchNodes(events chan *nodeEvent, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
nodes, resourceVersion, err := d.getNodes()
if err != nil {
log.Errorf("Cannot initialize nodes collection: %s", err)
return
}
// Reset the known nodes.
d.mtx.Lock()
d.nodes = map[string]*Node{}
d.mtx.Unlock()
for _, node := range nodes {
events <- &nodeEvent{Added, node}
}
req, err := http.NewRequest("GET", nodesURL, nil)
if err != nil {
log.Errorf("Cannot create nodes request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch nodes: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch nodes: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event nodeEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch nodes unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func (d *nodeDiscovery) updateNode(node *Node, eventType EventType) {
d.mtx.Lock()
defer d.mtx.Unlock()
updatedNodeName := node.ObjectMeta.Name
switch eventType {
case Deleted:
// Deleted - remove from nodes map.
delete(d.nodes, updatedNodeName)
case Added, Modified:
// Added/Modified - update the node in the nodes map.
d.nodes[updatedNodeName] = node
}
}
func (d *nodeDiscovery) getNodes() (map[string]*Node, string, error) {
res, err := d.kd.queryAPIServerPath(nodesURL)
if err != nil {
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
// & return error.
return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status)
}
var nodes NodeList
if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body))
}
nodeMap := map[string]*Node{}
for idx, node := range nodes.Items {
nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx]
}
return nodeMap, nodes.ResourceVersion, nil
}
// nodeAddresses returns the provided node's address, based on the priority: // nodeAddresses returns the provided node's address, based on the priority:
// 1. NodeInternalIP // 1. NodeInternalIP
// 2. NodeExternalIP // 2. NodeExternalIP
// 3. NodeLegacyHostIP // 3. NodeLegacyHostIP
// 3. NodeHostName
// //
// Copied from k8s.io/kubernetes/pkg/util/node/node.go // Derived from k8s.io/kubernetes/pkg/util/node/node.go
func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) { func nodeAddress(node *apiv1.Node) (string, map[apiv1.NodeAddressType][]string, error) {
addresses := node.Status.Addresses m := map[apiv1.NodeAddressType][]string{}
addressMap := map[NodeAddressType][]net.IP{} for _, a := range node.Status.Addresses {
for _, addr := range addresses { m[a.Type] = append(m[a.Type], a.Address)
ip := net.ParseIP(addr.Address)
// All addresses should be valid IPs.
if ip == nil {
continue
}
addressMap[addr.Type] = append(addressMap[addr.Type], ip)
} }
if addresses, ok := addressMap[NodeInternalIP]; ok {
return addresses[0], addressMap, nil if addresses, ok := m[apiv1.NodeInternalIP]; ok {
return addresses[0], m, nil
} }
if addresses, ok := addressMap[NodeExternalIP]; ok { if addresses, ok := m[apiv1.NodeExternalIP]; ok {
return addresses[0], addressMap, nil return addresses[0], m, nil
} }
if addresses, ok := addressMap[NodeLegacyHostIP]; ok { if addresses, ok := m[apiv1.NodeAddressType(api.NodeLegacyHostIP)]; ok {
return addresses[0], addressMap, nil return addresses[0], m, nil
} }
return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) if addresses, ok := m[apiv1.NodeHostName]; ok {
return addresses[0], m, nil
}
return "", m, fmt.Errorf("host address unknown")
} }

View file

@ -14,337 +14,164 @@
package kubernetes package kubernetes
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"net" "net"
"net/http"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/client-go/1.5/pkg/api"
apiv1 "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache"
) )
type podDiscovery struct { // Pod discovers new pod targets.
mtx sync.RWMutex type Pod struct {
pods map[string]map[string]*Pod informer cache.SharedInformer
retryInterval time.Duration store cache.Store
kd *Discovery logger log.Logger
} }
func (d *podDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { // NewPods creates a new pod discovery.
pods, _, err := d.getPods() func NewPods(l log.Logger, pods cache.SharedInformer) *Pod {
if err != nil { return &Pod{
log.Errorf("Cannot initialize pods collection: %s", err) informer: pods,
return store: pods.GetStore(),
logger: l,
} }
d.pods = pods }
initial := []*config.TargetGroup{} // Run implements the TargetProvider interface.
switch d.kd.Conf.Role { func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case config.KubernetesRolePod: // Send full initial set of pod targets.
initial = append(initial, d.updatePodsTargetGroup()) var initial []*config.TargetGroup
case config.KubernetesRoleContainer: for _, o := range p.store.List() {
for _, ns := range d.pods { tg := p.buildPod(o.(*apiv1.Pod))
for _, pod := range ns { initial = append(initial, tg)
initial = append(initial, d.updateContainerTargetGroup(pod))
} p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("initial pod")
}
} }
select { select {
case ch <- initial:
case <-ctx.Done(): case <-ctx.Done():
return return
case ch <- initial:
} }
update := make(chan *podEvent, 10) // Send target groups for pod updates.
go d.watchPods(update, ctx.Done(), d.retryInterval) send := func(tg *config.TargetGroup) {
p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("pod update")
for {
tgs := []*config.TargetGroup{}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return case ch <- []*config.TargetGroup{tg}:
case e := <-update:
log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", e.EventType, e.Pod.ObjectMeta.Name)
d.updatePod(e.Pod, e.EventType)
switch d.kd.Conf.Role {
case config.KubernetesRoleContainer:
// Update the per-pod target group
tgs = append(tgs, d.updateContainerTargetGroup(e.Pod))
case config.KubernetesRolePod:
// Update the all pods target group
tgs = append(tgs, d.updatePodsTargetGroup())
}
}
if tgs == nil {
continue
}
for _, tg := range tgs {
select {
case ch <- []*config.TargetGroup{tg}:
case <-ctx.Done():
return
}
} }
} }
p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
send(p.buildPod(o.(*apiv1.Pod)))
},
DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: podSource(o.(*apiv1.Pod))})
},
UpdateFunc: func(_, o interface{}) {
send(p.buildPod(o.(*apiv1.Pod)))
},
})
// Block until the target provider is explicitly canceled.
<-ctx.Done()
} }
func (d *podDiscovery) getPods() (map[string]map[string]*Pod, string, error) { const (
res, err := d.kd.queryAPIServerPath(podsURL) podNameLabel = metaLabelPrefix + "pod_name"
if err != nil { podIPLabel = metaLabelPrefix + "pod_ip"
return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err) podContainerNameLabel = metaLabelPrefix + "pod_container_name"
} podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name"
defer res.Body.Close() podContainerPortNumberLabel = metaLabelPrefix + "pod_container_port_number"
if res.StatusCode != http.StatusOK { podContainerPortProtocolLabel = metaLabelPrefix + "pod_container_port_protocol"
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status) podReadyLabel = metaLabelPrefix + "pod_ready"
podLabelPrefix = metaLabelPrefix + "pod_label_"
podAnnotationPrefix = metaLabelPrefix + "pod_annotation_"
podNodeNameLabel = metaLabelPrefix + "pod_node_name"
podHostIPLabel = metaLabelPrefix + "pod_host_ip"
)
func podLabels(pod *apiv1.Pod) model.LabelSet {
ls := model.LabelSet{
podNameLabel: lv(pod.ObjectMeta.Name),
podIPLabel: lv(pod.Status.PodIP),
podReadyLabel: podReady(pod),
podNodeNameLabel: lv(pod.Spec.NodeName),
podHostIPLabel: lv(pod.Status.HostIP),
} }
var pods PodList for k, v := range pod.Labels {
if err := json.NewDecoder(res.Body).Decode(&pods); err != nil { ln := strutil.SanitizeLabelName(serviceLabelPrefix + k)
body, _ := ioutil.ReadAll(res.Body) ls[model.LabelName(ln)] = lv(v)
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body))
} }
podMap := map[string]map[string]*Pod{} for k, v := range pod.Annotations {
for idx, pod := range pods.Items { ln := strutil.SanitizeLabelName(serviceAnnotationPrefix + k)
if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok { ls[model.LabelName(ln)] = lv(v)
podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace)
podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx]
} }
return podMap, pods.ResourceVersion, nil return ls
} }
func (d *podDiscovery) watchPods(events chan *podEvent, done <-chan struct{}, retryInterval time.Duration) { func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup {
until(func() { // During startup the pod may not have an IP yet. This does not even allow
pods, resourceVersion, err := d.getPods() // for an up metric, so we skip the target.
if err != nil { if len(pod.Status.PodIP) == 0 {
log.Errorf("Cannot initialize pods collection: %s", err) return nil
return
}
d.mtx.Lock()
d.pods = pods
d.mtx.Unlock()
req, err := http.NewRequest("GET", podsURL, nil)
if err != nil {
log.Errorf("Cannot create pods request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch pods: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch pods: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event podEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch pods unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func (d *podDiscovery) updatePod(pod *Pod, eventType EventType) {
d.mtx.Lock()
defer d.mtx.Unlock()
switch eventType {
case Deleted:
if _, ok := d.pods[pod.ObjectMeta.Namespace]; ok {
delete(d.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name)
if len(d.pods[pod.ObjectMeta.Namespace]) == 0 {
delete(d.pods, pod.ObjectMeta.Namespace)
}
}
case Added, Modified:
if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok {
d.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod
} }
}
func (d *podDiscovery) updateContainerTargetGroup(pod *Pod) *config.TargetGroup {
d.mtx.RLock()
defer d.mtx.RUnlock()
tg := &config.TargetGroup{ tg := &config.TargetGroup{
Source: podSource(pod), Source: podSource(pod),
} }
tg.Labels = podLabels(pod)
tg.Labels[namespaceLabel] = lv(pod.Namespace)
// If this pod doesn't exist, return an empty target group for _, c := range pod.Spec.Containers {
if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok { // If no ports are defined for the container, create an anonymous
return tg // target per container.
} if len(c.Ports) == 0 {
if _, ok := d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok { // We don't have a port so we just set the address label to the pod IP.
return tg // The user has to add a port manually.
} tg.Targets = append(tg.Targets, model.LabelSet{
model.AddressLabel: lv(pod.Status.PodIP),
tg.Labels = model.LabelSet{ podContainerNameLabel: lv(c.Name),
roleLabel: model.LabelValue("container"), })
}
tg.Targets = updatePodTargets(pod, true)
return tg
}
func (d *podDiscovery) updatePodsTargetGroup() *config.TargetGroup {
tg := &config.TargetGroup{
Source: podsTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("pod"),
},
}
for _, namespace := range d.pods {
for _, pod := range namespace {
tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...)
}
}
return tg
}
func podSource(pod *Pod) string {
return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name
}
func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet {
var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers))
if pod.PodStatus.PodIP == "" {
log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name)
return targets
}
if pod.PodStatus.Phase != "Running" {
log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name)
return targets
}
// Should never hit this (running pods should always have this set), but better to be defensive.
if pod.PodStatus.HostIP == "" {
log.Debugf("skipping pod %s -- PodStatus.HostIP is empty", pod.ObjectMeta.Name)
return targets
}
ready := "unknown"
for _, cond := range pod.PodStatus.Conditions {
if strings.ToLower(cond.Type) == "ready" {
ready = strings.ToLower(cond.Status)
}
}
sort.Sort(ByContainerName(pod.PodSpec.Containers))
for _, container := range pod.PodSpec.Containers {
// Collect a list of TCP ports
// Sort by port number, ascending
// Product a target pointed at the first port
// Include a label containing all ports (portName=port,PortName=port,...,)
var tcpPorts []ContainerPort
var portLabel *bytes.Buffer = bytes.NewBufferString(",")
for _, port := range container.Ports {
if port.Protocol == "TCP" {
tcpPorts = append(tcpPorts, port)
}
}
if len(tcpPorts) == 0 {
log.Debugf("skipping container %s with no TCP ports", container.Name)
continue continue
} }
// Otherwise create one target for each container/port combination.
for _, port := range c.Ports {
ports := strconv.FormatInt(int64(port.ContainerPort), 10)
addr := net.JoinHostPort(pod.Status.PodIP, ports)
sort.Sort(ByContainerPort(tcpPorts)) tg.Targets = append(tg.Targets, model.LabelSet{
model.AddressLabel: lv(addr),
t := model.LabelSet{ podContainerNameLabel: lv(c.Name),
model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), podContainerPortNumberLabel: lv(ports),
podNameLabel: model.LabelValue(pod.ObjectMeta.Name), podContainerPortNameLabel: lv(port.Name),
podAddressLabel: model.LabelValue(pod.PodStatus.PodIP), podContainerPortProtocolLabel: lv(string(port.Protocol)),
podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), })
podContainerNameLabel: model.LabelValue(container.Name),
podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name),
podReadyLabel: model.LabelValue(ready),
podNodeNameLabel: model.LabelValue(pod.PodSpec.NodeName),
podHostIPLabel: model.LabelValue(pod.PodStatus.HostIP),
}
for _, port := range tcpPorts {
portLabel.WriteString(port.Name)
portLabel.WriteString("=")
portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10))
portLabel.WriteString(",")
t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10))
}
t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String())
for k, v := range pod.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(podLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
for k, v := range pod.ObjectMeta.Annotations {
labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
targets = append(targets, t)
if !allContainers {
break
} }
} }
if len(targets) == 0 { return tg
log.Debugf("no targets for pod %s", pod.ObjectMeta.Name)
}
return targets
} }
type ByContainerPort []ContainerPort func podSource(pod *apiv1.Pod) string {
return "pod/" + pod.Namespace + "/" + pod.Name
}
func (a ByContainerPort) Len() int { return len(a) } func podReady(pod *apiv1.Pod) model.LabelValue {
func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } for _, cond := range pod.Status.Conditions {
func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } if cond.Type == apiv1.PodReady {
return lv(strings.ToLower(string(cond.Status)))
type ByContainerName []Container }
}
func (a ByContainerName) Len() int { return len(a) } return lv(strings.ToLower(string(api.ConditionUnknown)))
func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name } }
func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View file

@ -14,357 +14,112 @@
package kubernetes package kubernetes
import ( import (
"encoding/json"
"fmt"
"io/ioutil"
"net" "net"
"net/http" "strconv"
"sync"
"time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context" "golang.org/x/net/context"
apiv1 "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache"
) )
type serviceDiscovery struct { // Service implements discovery of Kubernetes services.
mtx sync.RWMutex type Service struct {
services map[string]map[string]*Service logger log.Logger
retryInterval time.Duration informer cache.SharedInformer
kd *Discovery store cache.Store
} }
func (d *serviceDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { // NewService returns a new service discovery.
update := make(chan interface{}, 10) func NewService(l log.Logger, inf cache.SharedInformer) *Service {
go d.startServiceWatch(update, ctx.Done(), d.retryInterval) return &Service{logger: l, informer: inf, store: inf.GetStore()}
}
for { // Run implements the TargetProvider interface.
tgs := []*config.TargetGroup{} func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send full initial set of pod targets.
var initial []*config.TargetGroup
for _, o := range s.store.List() {
tg := s.buildService(o.(*apiv1.Service))
initial = append(initial, tg)
}
select {
case <-ctx.Done():
return
case ch <- initial:
}
// Send target groups for service updates.
send := func(tg *config.TargetGroup) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return case ch <- []*config.TargetGroup{tg}:
case event := <-update:
switch e := event.(type) {
case *endpointsEvent:
log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", e.EventType, e.Endpoints.ObjectMeta.Name)
tgs = append(tgs, d.updateServiceEndpoints(e.Endpoints, e.EventType))
case *serviceEvent:
log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", e.EventType, e.Service.ObjectMeta.Name)
tgs = append(tgs, d.updateService(e.Service, e.EventType))
}
}
if tgs == nil {
continue
}
for _, tg := range tgs {
select {
case ch <- []*config.TargetGroup{tg}:
case <-ctx.Done():
return
}
} }
} }
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
} AddFunc: func(o interface{}) {
send(s.buildService(o.(*apiv1.Service)))
func (d *serviceDiscovery) getServices() (map[string]map[string]*Service, string, error) {
res, err := d.kd.queryAPIServerPath(servicesURL)
if err != nil {
// If we can't list services then we can't watch them. Assume this is a misconfiguration
// & return error.
return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status)
}
var services ServiceList
if err := json.NewDecoder(res.Body).Decode(&services); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body))
}
serviceMap := map[string]map[string]*Service{}
for idx, service := range services.Items {
namespace, ok := serviceMap[service.ObjectMeta.Namespace]
if !ok {
namespace = map[string]*Service{}
serviceMap[service.ObjectMeta.Namespace] = namespace
}
namespace[service.ObjectMeta.Name] = &services.Items[idx]
}
return serviceMap, services.ResourceVersion, nil
}
// watchServices watches services as they come & go.
func (d *serviceDiscovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
// We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted
// in Kubernetes while we couldn't connect - small chance of this, but worth dealing with.
d.mtx.Lock()
existingServices := d.services
// Reset the known services.
d.services = map[string]map[string]*Service{}
d.mtx.Unlock()
services, resourceVersion, err := d.getServices()
if err != nil {
log.Errorf("Cannot initialize services collection: %s", err)
return
}
// Now let's loop through the old services & see if they still exist in here
for oldNSName, oldNS := range existingServices {
if ns, ok := services[oldNSName]; !ok {
for _, service := range existingServices[oldNSName] {
events <- &serviceEvent{Deleted, service}
}
} else {
for oldServiceName, oldService := range oldNS {
if _, ok := ns[oldServiceName]; !ok {
events <- &serviceEvent{Deleted, oldService}
}
}
}
}
// Discard the existing services map for GC.
existingServices = nil
for _, ns := range services {
for _, service := range ns {
events <- &serviceEvent{Added, service}
}
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
d.watchServices(resourceVersion, events, done)
wg.Done()
}()
go func() {
d.watchServiceEndpoints(resourceVersion, events, done)
wg.Done()
}()
wg.Wait()
}, retryInterval, done)
}
func (d *serviceDiscovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
req, err := http.NewRequest("GET", servicesURL, nil)
if err != nil {
log.Errorf("Failed to create services request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch services: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch services: %d", res.StatusCode)
return
}
dec := json.NewDecoder(res.Body)
for {
var event serviceEvent
if err := dec.Decode(&event); err != nil {
log.Errorf("Watch services unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
return
}
}
}
// watchServiceEndpoints watches service endpoints as they come & go.
func (d *serviceDiscovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
req, err := http.NewRequest("GET", endpointsURL, nil)
if err != nil {
log.Errorf("Failed to create service endpoints request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch service endpoints: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch service endpoints: %d", res.StatusCode)
return
}
dec := json.NewDecoder(res.Body)
for {
var event endpointsEvent
if err := dec.Decode(&event); err != nil {
log.Errorf("Watch service endpoints unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}
func (d *serviceDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup {
d.mtx.Lock()
defer d.mtx.Unlock()
switch eventType {
case Deleted:
return d.deleteService(service)
case Added, Modified:
return d.addService(service)
}
return nil
}
func (d *serviceDiscovery) deleteService(service *Service) *config.TargetGroup {
tg := &config.TargetGroup{Source: serviceSource(service)}
delete(d.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name)
if len(d.services[service.ObjectMeta.Namespace]) == 0 {
delete(d.services, service.ObjectMeta.Namespace)
}
return tg
}
func (d *serviceDiscovery) addService(service *Service) *config.TargetGroup {
namespace, ok := d.services[service.ObjectMeta.Namespace]
if !ok {
namespace = map[string]*Service{}
d.services[service.ObjectMeta.Namespace] = namespace
}
namespace[service.ObjectMeta.Name] = service
endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name)
res, err := d.kd.queryAPIServerPath(endpointURL)
if err != nil {
log.Errorf("Error getting service endpoints: %s", err)
return nil
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to get service endpoints: %d", res.StatusCode)
return nil
}
var eps Endpoints
if err := json.NewDecoder(res.Body).Decode(&eps); err != nil {
log.Errorf("Error getting service endpoints: %s", err)
return nil
}
return d.updateServiceTargetGroup(service, &eps)
}
func (d *serviceDiscovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup {
tg := &config.TargetGroup{
Source: serviceSource(service),
Labels: model.LabelSet{
serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace),
serviceNameLabel: model.LabelValue(service.ObjectMeta.Name),
}, },
DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: serviceSource(o.(*apiv1.Service))})
},
UpdateFunc: func(_, o interface{}) {
send(s.buildService(o.(*apiv1.Service)))
},
})
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
func serviceSource(s *apiv1.Service) string {
return "svc/" + s.Namespace + "/" + s.Name
}
const (
serviceNameLabel = metaLabelPrefix + "service_name"
serviceLabelPrefix = metaLabelPrefix + "service_label_"
serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_"
servicePortNameLabel = metaLabelPrefix + "service_port_name"
servicePortProtocolLabel = metaLabelPrefix + "service_port_protocol"
)
func serviceLabels(svc *apiv1.Service) model.LabelSet {
ls := make(model.LabelSet, len(svc.Labels)+len(svc.Annotations)+2)
ls[serviceNameLabel] = lv(svc.Name)
for k, v := range svc.Labels {
ln := strutil.SanitizeLabelName(serviceLabelPrefix + k)
ls[model.LabelName(ln)] = lv(v)
} }
for k, v := range service.ObjectMeta.Labels { for k, v := range svc.Annotations {
labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k) ln := strutil.SanitizeLabelName(serviceAnnotationPrefix + k)
tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) ls[model.LabelName(ln)] = lv(v)
} }
return ls
}
for k, v := range service.ObjectMeta.Annotations { func (s *Service) buildService(svc *apiv1.Service) *config.TargetGroup {
labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) tg := &config.TargetGroup{
tg.Labels[model.LabelName(labelName)] = model.LabelValue(v) Source: serviceSource(svc),
} }
tg.Labels = serviceLabels(svc)
tg.Labels[namespaceLabel] = lv(svc.Namespace)
serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc" for _, port := range svc.Spec.Ports {
addr := net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", strconv.FormatInt(int64(port.Port), 10))
// Append the first TCP service port if one exists. tg.Targets = append(tg.Targets, model.LabelSet{
for _, port := range service.Spec.Ports { model.AddressLabel: lv(addr),
if port.Protocol == ProtocolTCP { servicePortNameLabel: lv(port.Name),
serviceAddress += fmt.Sprintf(":%d", port.Port) servicePortProtocolLabel: lv(string(port.Protocol)),
break })
}
}
switch d.kd.Conf.Role {
case config.KubernetesRoleService:
t := model.LabelSet{
model.AddressLabel: model.LabelValue(serviceAddress),
roleLabel: model.LabelValue("service"),
}
tg.Targets = append(tg.Targets, t)
case config.KubernetesRoleEndpoint:
// Now let's loop through the endpoints & add them to the target group
// with appropriate labels.
for _, ss := range eps.Subsets {
epPort := ss.Ports[0].Port
for _, addr := range ss.Addresses {
ipAddr := addr.IP
if len(ipAddr) == net.IPv6len {
ipAddr = "[" + ipAddr + "]"
}
address := net.JoinHostPort(ipAddr, fmt.Sprintf("%d", epPort))
t := model.LabelSet{
model.AddressLabel: model.LabelValue(address),
roleLabel: model.LabelValue("endpoint"),
}
tg.Targets = append(tg.Targets, t)
}
}
} }
return tg return tg
} }
func (d *serviceDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup {
d.mtx.Lock()
defer d.mtx.Unlock()
serviceNamespace := endpoints.ObjectMeta.Namespace
serviceName := endpoints.ObjectMeta.Name
if service, ok := d.services[serviceNamespace][serviceName]; ok {
return d.updateServiceTargetGroup(service, endpoints)
}
return nil
}
func serviceSource(service *Service) string {
return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name
}

View file

@ -1,299 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
// EventType can legally only have the values defined as constants below.
type EventType string
// Possible values for EventType.
const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
)
type nodeEvent struct {
EventType EventType `json:"type"`
Node *Node `json:"object"`
}
type serviceEvent struct {
EventType EventType `json:"type"`
Service *Service `json:"object"`
}
type endpointsEvent struct {
EventType EventType `json:"type"`
Endpoints *Endpoints `json:"object"`
}
// From here down types are copied from
// https://github.com/GoogleCloudPlatform/kubernetes/blob/master/pkg/api/v1/types.go
// with all currently irrelevant types/fields stripped out. This removes the
// need for any kubernetes dependencies, with the drawback of having to keep
// this file up to date.
// ListMeta describes metadata that synthetic resources must have, including lists and
// various status objects.
type ListMeta struct {
// An opaque value that represents the version of this response for use with optimistic
// concurrency and change monitoring endpoints. Clients must treat these values as opaque
// and values may only be valid for a particular resource or set of resources. Only servers
// will generate resource versions.
ResourceVersion string `json:"resourceVersion,omitempty" description:"string that identifies the internal version of this object that can be used by clients to determine when objects have changed; populated by the system, read-only; value must be treated as opaque by clients and passed unmodified back to the server: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#concurrency-control-and-consistency"`
}
// ObjectMeta is metadata that all persisted resources must have, which includes all objects
// users must create.
type ObjectMeta struct {
// Name is unique within a namespace. Name is required when creating resources, although
// some resources may allow a client to request the generation of an appropriate name
// automatically. Name is primarily intended for creation idempotence and configuration
// definition.
Name string `json:"name,omitempty" description:"string that identifies an object. Must be unique within a namespace; cannot be updated; see http://releases.k8s.io/HEAD/docs/user-guide/identifiers.md#names"`
// Namespace defines the space within which name must be unique. An empty namespace is
// equivalent to the "default" namespace, but "default" is the canonical representation.
// Not all objects are required to be scoped to a namespace - the value of this field for
// those objects will be empty.
Namespace string `json:"namespace,omitempty" description:"namespace of the object; must be a DNS_LABEL; cannot be updated; see http://releases.k8s.io/HEAD/docs/user-guide/namespaces.md"`
ResourceVersion string `json:"resourceVersion,omitempty" description:"string that identifies the internal version of this object that can be used by clients to determine when objects have changed; populated by the system, read-only; value must be treated as opaque by clients and passed unmodified back to the server: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#concurrency-control-and-consistency"`
// TODO: replace map[string]string with labels.LabelSet type
Labels map[string]string `json:"labels,omitempty" description:"map of string keys and values that can be used to organize and categorize objects; may match selectors of replication controllers and services; see http://releases.k8s.io/HEAD/docs/user-guide/labels.md"`
// Annotations are unstructured key value data stored with a resource that may be set by
// external tooling. They are not queryable and should be preserved when modifying
// objects.
Annotations map[string]string `json:"annotations,omitempty" description:"map of string keys and values that can be used by external tooling to store and retrieve arbitrary metadata about objects; see http://releases.k8s.io/HEAD/docs/user-guide/annotations.md"`
}
// Protocol defines network protocols supported for things like container ports.
type Protocol string
const (
// ProtocolTCP is the TCP protocol.
ProtocolTCP Protocol = "TCP"
// ProtocolUDP is the UDP protocol.
ProtocolUDP Protocol = "UDP"
)
const (
// NamespaceAll is the default argument to specify on a context when you want to list or filter resources across all namespaces
NamespaceAll string = ""
)
// Container represents a single container that is expected to be run on the host.
type Container struct {
// Required: This must be a DNS_LABEL. Each container in a pod must
// have a unique name.
Name string `json:"name" description:"name of the container; must be a DNS_LABEL and unique within the pod; cannot be updated"`
// Optional.
Image string `json:"image,omitempty" description:"Docker image name; see http://releases.k8s.io/HEAD/docs/user-guide/images.md"`
Ports []ContainerPort `json:"ports"`
}
type ContainerPort struct {
Name string `json:"name"`
ContainerPort int32 `json:"containerPort"`
Protocol string `json:"protocol"`
}
// Service is a named abstraction of software service (for example, mysql) consisting of local port
// (for example 3306) that the proxy listens on, and the selector that determines which pods
// will answer requests sent through the proxy.
type Service struct {
ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
// Spec defines the behavior of a service.
// http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status
Spec ServiceSpec `json:"spec,omitempty"`
}
// ServiceSpec describes the attributes that a user creates on a service.
type ServiceSpec struct {
// The list of ports that are exposed by this service.
// More info: http://releases.k8s.io/HEAD/docs/user-guide/services.md#virtual-ips-and-service-proxies
Ports []ServicePort `json:"ports"`
}
// ServicePort contains information on service's port.
type ServicePort struct {
// The IP protocol for this port. Supports "TCP" and "UDP".
// Default is TCP.
Protocol Protocol `json:"protocol,omitempty"`
// The port that will be exposed by this service.
Port int32 `json:"port"`
}
// ServiceList holds a list of services.
type ServiceList struct {
ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
Items []Service `json:"items" description:"list of services"`
}
// Endpoints is a collection of endpoints that implement the actual service. Example:
// Name: "mysvc",
// Subsets: [
// {
// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}],
// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}]
// },
// {
// Addresses: [{"ip": "10.10.3.3"}],
// Ports: [{"name": "a", "port": 93}, {"name": "b", "port": 76}]
// },
// ]
type Endpoints struct {
ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
// The set of all endpoints is the union of all subsets.
Subsets []EndpointSubset `json:"subsets" description:"sets of addresses and ports that comprise a service"`
}
// EndpointSubset is a group of addresses with a common set of ports. The
// expanded set of endpoints is the Cartesian product of Addresses x Ports.
// For example, given:
// {
// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}],
// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}]
// }
// The resulting set of endpoints can be viewed as:
// a: [ 10.10.1.1:8675, 10.10.2.2:8675 ],
// b: [ 10.10.1.1:309, 10.10.2.2:309 ]
type EndpointSubset struct {
Addresses []EndpointAddress `json:"addresses,omitempty" description:"IP addresses which offer the related ports"`
Ports []EndpointPort `json:"ports,omitempty" description:"port numbers available on the related IP addresses"`
}
// EndpointAddress is a tuple that describes single IP address.
type EndpointAddress struct {
// The IP of this endpoint.
// TODO: This should allow hostname or IP, see #4447.
IP string `json:"ip" description:"IP address of the endpoint"`
}
// EndpointPort is a tuple that describes a single port.
type EndpointPort struct {
// The port number.
Port int `json:"port" description:"port number of the endpoint"`
// The IP protocol for this port.
Protocol Protocol `json:"protocol,omitempty" description:"protocol for this port; must be UDP or TCP; TCP if unspecified"`
}
// EndpointsList is a list of endpoints.
type EndpointsList struct {
ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
Items []Endpoints `json:"items" description:"list of endpoints"`
}
// NodeStatus is information about the current status of a node.
type NodeStatus struct {
// Queried from cloud provider, if available.
Addresses []NodeAddress `json:"addresses,omitempty" description:"list of addresses reachable to the node; see http://releases.k8s.io/HEAD/docs/admin/node.md#node-addresses" patchStrategy:"merge" patchMergeKey:"type"`
// Endpoints of daemons running on the Node.
DaemonEndpoints NodeDaemonEndpoints `json:"daemonEndpoints,omitempty"`
}
// NodeDaemonEndpoints lists ports opened by daemons running on the Node.
type NodeDaemonEndpoints struct {
// Endpoint on which Kubelet is listening.
KubeletEndpoint DaemonEndpoint `json:"kubeletEndpoint,omitempty"`
}
// DaemonEndpoint contains information about a single Daemon endpoint.
type DaemonEndpoint struct {
/*
The port tag was not properly in quotes in earlier releases, so it must be
uppercased for backwards compat (since it was falling back to var name of
'Port').
*/
// Port number of the given endpoint.
Port int32 `json:"Port"`
}
// NodeAddressType can legally only have the values defined as constants below.
type NodeAddressType string
// These are valid address types of node. NodeLegacyHostIP is used to transit
// from out-dated HostIP field to NodeAddress.
const (
NodeLegacyHostIP NodeAddressType = "LegacyHostIP"
NodeHostName NodeAddressType = "Hostname"
NodeExternalIP NodeAddressType = "ExternalIP"
NodeInternalIP NodeAddressType = "InternalIP"
)
// NodeAddress defines the address of a node.
type NodeAddress struct {
Type NodeAddressType `json:"type" description:"node address type, one of Hostname, ExternalIP or InternalIP"`
Address string `json:"address" description:"the node address"`
}
// Node is a worker node in Kubernetes, formerly known as minion.
// Each node will have a unique identifier in the cache (i.e. in etcd).
type Node struct {
ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
// Status describes the current status of a Node
Status NodeStatus `json:"status,omitempty" description:"most recently observed status of the node; populated by the system, read-only; http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status"`
}
// NodeList is the whole list of all Nodes which have been registered with master.
type NodeList struct {
ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
Items []Node `json:"items" description:"list of nodes"`
}
type Pod struct {
ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
PodStatus `json:"status,omitempty" description:"pod status object; see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_podstatus"`
PodSpec `json:"spec,omitempty" description:"pod spec object; see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_podspec"`
}
type podEvent struct {
EventType EventType `json:"type"`
Pod *Pod `json:"object"`
}
type PodList struct {
ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
Items []Pod `json:"items" description:"list of pods"`
}
type PodStatus struct {
Phase string `json:"phase" description:"Current condition of the pod. More info: http://kubernetes.io/v1.1/docs/user-guide/pod-states.html#pod-phase"`
PodIP string `json:"podIP" description:"IP address allocated to the pod. Routable at least within the cluster. Empty if not yet allocated."`
Conditions []PodCondition `json:"conditions" description:"Current service state of pod."`
HostIP string `json:"hostIP,omitempty" description:"IP address of the host to which the pod is assigned. Empty if not yet scheduled."`
}
type PodSpec struct {
Containers []Container `json:"containers" description:"list of containers, see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_container"`
NodeName string `json:"nodeName,omitempty" description:"NodeName is a request to schedule this pod onto a specific node. If it is non-empty, the scheduler simply schedules this pod onto that node, assuming that it fits resource requirements."`
}
type PodCondition struct {
Type string `json:"type" description:"Type is the type of the condition. Currently only Ready."`
Status string `json:"status" description:"Status is the status of the condition. Can be True, False, Unknown."`
}

View file

@ -1,161 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetesv2
import (
"fmt"
"net"
"strconv"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
"k8s.io/client-go/1.5/pkg/api"
apiv1 "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache"
)
// Node discovers Kubernetes nodes.
type Node struct {
logger log.Logger
informer cache.SharedInformer
store cache.Store
}
// NewNode returns a new node discovery.
func NewNode(l log.Logger, inf cache.SharedInformer) *Node {
return &Node{logger: l, informer: inf, store: inf.GetStore()}
}
// Run implements the TargetProvider interface.
func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send full initial set of pod targets.
var initial []*config.TargetGroup
for _, o := range n.store.List() {
tg := n.buildNode(o.(*apiv1.Node))
initial = append(initial, tg)
}
select {
case <-ctx.Done():
return
case ch <- initial:
}
// Send target groups for service updates.
send := func(tg *config.TargetGroup) {
select {
case <-ctx.Done():
case ch <- []*config.TargetGroup{tg}:
}
}
n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
send(n.buildNode(o.(*apiv1.Node)))
},
DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: nodeSource(o.(*apiv1.Node))})
},
UpdateFunc: func(_, o interface{}) {
send(n.buildNode(o.(*apiv1.Node)))
},
})
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
func nodeSource(n *apiv1.Node) string {
return "node/" + n.Namespace + "/" + n.Name
}
const (
nodeNameLabel = metaLabelPrefix + "node_name"
nodeLabelPrefix = metaLabelPrefix + "node_label_"
nodeAnnotationPrefix = metaLabelPrefix + "node_annotation_"
nodeAddressPrefix = metaLabelPrefix + "node_address_"
)
func nodeLabels(n *apiv1.Node) model.LabelSet {
ls := make(model.LabelSet, len(n.Labels)+len(n.Annotations)+2)
ls[nodeNameLabel] = lv(n.Name)
for k, v := range n.Labels {
ln := strutil.SanitizeLabelName(nodeLabelPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
for k, v := range n.Annotations {
ln := strutil.SanitizeLabelName(nodeAnnotationPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
return ls
}
func (n *Node) buildNode(node *apiv1.Node) *config.TargetGroup {
tg := &config.TargetGroup{
Source: nodeSource(node),
}
tg.Labels = nodeLabels(node)
addr, addrMap, err := nodeAddress(node)
if err != nil {
n.logger.With("err", err).Debugf("No node address found")
return nil
}
addr = net.JoinHostPort(addr, strconv.FormatInt(int64(node.Status.DaemonEndpoints.KubeletEndpoint.Port), 10))
t := model.LabelSet{
model.AddressLabel: lv(addr),
model.InstanceLabel: lv(node.Name),
}
for ty, a := range addrMap {
ln := strutil.SanitizeLabelName(nodeAddressPrefix + string(ty))
t[model.LabelName(ln)] = lv(a[0])
}
tg.Targets = append(tg.Targets, t)
return tg
}
// nodeAddresses returns the provided node's address, based on the priority:
// 1. NodeInternalIP
// 2. NodeExternalIP
// 3. NodeLegacyHostIP
// 3. NodeHostName
//
// Derived from k8s.io/kubernetes/pkg/util/node/node.go
func nodeAddress(node *apiv1.Node) (string, map[apiv1.NodeAddressType][]string, error) {
m := map[apiv1.NodeAddressType][]string{}
for _, a := range node.Status.Addresses {
m[a.Type] = append(m[a.Type], a.Address)
}
if addresses, ok := m[apiv1.NodeInternalIP]; ok {
return addresses[0], m, nil
}
if addresses, ok := m[apiv1.NodeExternalIP]; ok {
return addresses[0], m, nil
}
if addresses, ok := m[apiv1.NodeAddressType(api.NodeLegacyHostIP)]; ok {
return addresses[0], m, nil
}
if addresses, ok := m[apiv1.NodeHostName]; ok {
return addresses[0], m, nil
}
return "", m, fmt.Errorf("host address unknown")
}

View file

@ -1,177 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetesv2
import (
"fmt"
"net"
"strconv"
"strings"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
"k8s.io/client-go/1.5/pkg/api"
apiv1 "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache"
)
// Pod discovers new pod targets.
type Pod struct {
informer cache.SharedInformer
store cache.Store
logger log.Logger
}
// NewPods creates a new pod discovery.
func NewPods(l log.Logger, pods cache.SharedInformer) *Pod {
return &Pod{
informer: pods,
store: pods.GetStore(),
logger: l,
}
}
// Run implements the TargetProvider interface.
func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send full initial set of pod targets.
var initial []*config.TargetGroup
for _, o := range p.store.List() {
tg := p.buildPod(o.(*apiv1.Pod))
initial = append(initial, tg)
p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("initial pod")
}
select {
case <-ctx.Done():
return
case ch <- initial:
}
// Send target groups for pod updates.
send := func(tg *config.TargetGroup) {
p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("pod update")
select {
case <-ctx.Done():
case ch <- []*config.TargetGroup{tg}:
}
}
p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
send(p.buildPod(o.(*apiv1.Pod)))
},
DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: podSource(o.(*apiv1.Pod))})
},
UpdateFunc: func(_, o interface{}) {
send(p.buildPod(o.(*apiv1.Pod)))
},
})
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
const (
podNameLabel = metaLabelPrefix + "pod_name"
podIPLabel = metaLabelPrefix + "pod_ip"
podContainerNameLabel = metaLabelPrefix + "pod_container_name"
podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name"
podContainerPortNumberLabel = metaLabelPrefix + "pod_container_port_number"
podContainerPortProtocolLabel = metaLabelPrefix + "pod_container_port_protocol"
podReadyLabel = metaLabelPrefix + "pod_ready"
podLabelPrefix = metaLabelPrefix + "pod_label_"
podAnnotationPrefix = metaLabelPrefix + "pod_annotation_"
podNodeNameLabel = metaLabelPrefix + "pod_node_name"
podHostIPLabel = metaLabelPrefix + "pod_host_ip"
)
func podLabels(pod *apiv1.Pod) model.LabelSet {
ls := model.LabelSet{
podNameLabel: lv(pod.ObjectMeta.Name),
podIPLabel: lv(pod.Status.PodIP),
podReadyLabel: podReady(pod),
podNodeNameLabel: lv(pod.Spec.NodeName),
podHostIPLabel: lv(pod.Status.HostIP),
}
for k, v := range pod.Labels {
ln := strutil.SanitizeLabelName(serviceLabelPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
for k, v := range pod.Annotations {
ln := strutil.SanitizeLabelName(serviceAnnotationPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
return ls
}
func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup {
// During startup the pod may not have an IP yet. This does not even allow
// for an up metric, so we skip the target.
if len(pod.Status.PodIP) == 0 {
return nil
}
tg := &config.TargetGroup{
Source: podSource(pod),
}
tg.Labels = podLabels(pod)
tg.Labels[namespaceLabel] = lv(pod.Namespace)
for _, c := range pod.Spec.Containers {
// If no ports are defined for the container, create an anonymous
// target per container.
if len(c.Ports) == 0 {
// We don't have a port so we just set the address label to the pod IP.
// The user has to add a port manually.
tg.Targets = append(tg.Targets, model.LabelSet{
model.AddressLabel: lv(pod.Status.PodIP),
podContainerNameLabel: lv(c.Name),
})
continue
}
// Otherwise create one target for each container/port combination.
for _, port := range c.Ports {
ports := strconv.FormatInt(int64(port.ContainerPort), 10)
addr := net.JoinHostPort(pod.Status.PodIP, ports)
tg.Targets = append(tg.Targets, model.LabelSet{
model.AddressLabel: lv(addr),
podContainerNameLabel: lv(c.Name),
podContainerPortNumberLabel: lv(ports),
podContainerPortNameLabel: lv(port.Name),
podContainerPortProtocolLabel: lv(string(port.Protocol)),
})
}
}
return tg
}
func podSource(pod *apiv1.Pod) string {
return "pod/" + pod.Namespace + "/" + pod.Name
}
func podReady(pod *apiv1.Pod) model.LabelValue {
for _, cond := range pod.Status.Conditions {
if cond.Type == apiv1.PodReady {
return lv(strings.ToLower(string(cond.Status)))
}
}
return lv(strings.ToLower(string(api.ConditionUnknown)))
}

View file

@ -1,125 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetesv2
import (
"net"
"strconv"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
apiv1 "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache"
)
// Service implements discovery of Kubernetes services.
type Service struct {
logger log.Logger
informer cache.SharedInformer
store cache.Store
}
// NewService returns a new service discovery.
func NewService(l log.Logger, inf cache.SharedInformer) *Service {
return &Service{logger: l, informer: inf, store: inf.GetStore()}
}
// Run implements the TargetProvider interface.
func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send full initial set of pod targets.
var initial []*config.TargetGroup
for _, o := range s.store.List() {
tg := s.buildService(o.(*apiv1.Service))
initial = append(initial, tg)
}
select {
case <-ctx.Done():
return
case ch <- initial:
}
// Send target groups for service updates.
send := func(tg *config.TargetGroup) {
select {
case <-ctx.Done():
case ch <- []*config.TargetGroup{tg}:
}
}
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
send(s.buildService(o.(*apiv1.Service)))
},
DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: serviceSource(o.(*apiv1.Service))})
},
UpdateFunc: func(_, o interface{}) {
send(s.buildService(o.(*apiv1.Service)))
},
})
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
func serviceSource(s *apiv1.Service) string {
return "svc/" + s.Namespace + "/" + s.Name
}
const (
serviceNameLabel = metaLabelPrefix + "service_name"
serviceLabelPrefix = metaLabelPrefix + "service_label_"
serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_"
servicePortNameLabel = metaLabelPrefix + "service_port_name"
servicePortProtocolLabel = metaLabelPrefix + "service_port_protocol"
)
func serviceLabels(svc *apiv1.Service) model.LabelSet {
ls := make(model.LabelSet, len(svc.Labels)+len(svc.Annotations)+2)
ls[serviceNameLabel] = lv(svc.Name)
for k, v := range svc.Labels {
ln := strutil.SanitizeLabelName(serviceLabelPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
for k, v := range svc.Annotations {
ln := strutil.SanitizeLabelName(serviceAnnotationPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
return ls
}
func (s *Service) buildService(svc *apiv1.Service) *config.TargetGroup {
tg := &config.TargetGroup{
Source: serviceSource(svc),
}
tg.Labels = serviceLabels(svc)
tg.Labels[namespaceLabel] = lv(svc.Namespace)
for _, port := range svc.Spec.Ports {
addr := net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", strconv.FormatInt(int64(port.Port), 10))
tg.Targets = append(tg.Targets, model.LabelSet{
model.AddressLabel: lv(addr),
servicePortNameLabel: lv(port.Name),
servicePortProtocolLabel: lv(string(port.Protocol)),
})
}
return tg
}

View file

@ -399,14 +399,6 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
} }
app("kubernetes", i, k) app("kubernetes", i, k)
} }
for i, c := range cfg.KubernetesV2SDConfigs {
k, err := discovery.NewKubernetesV2Discovery(c)
if err != nil {
log.Errorf("Cannot create Kubernetes discovery: %s", err)
continue
}
app("kubernetes", i, k)
}
for i, c := range cfg.ServersetSDConfigs { for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, discovery.NewServersetDiscovery(c)) app("serverset", i, discovery.NewServersetDiscovery(c))
} }