kubernetes: Add K8S v2 pod discovery

This adds plumbing for a parallel version of the new K8S SD
and adds pod discovery as the first role.
This commit is contained in:
Fabian Reinartz 2016-09-28 19:29:55 +02:00
parent 4f96d28e60
commit 2331701b50
5 changed files with 313 additions and 0 deletions

View file

@ -442,6 +442,8 @@ type ScrapeConfig struct {
MarathonSDConfigs []*MarathonSDConfig `yaml:"marathon_sd_configs,omitempty"`
// List of Kubernetes service discovery configurations.
KubernetesSDConfigs []*KubernetesSDConfig `yaml:"kubernetes_sd_configs,omitempty"`
// List of Kubernetes service discovery configurations.
KubernetesV2SDConfigs []*KubernetesV2SDConfig `yaml:"kubernetes_v2_sd_configs,omitempty"`
// List of GCE service discovery configurations.
GCESDConfigs []*GCESDConfig `yaml:"gce_sd_configs,omitempty"`
// List of EC2 service discovery configurations.
@ -861,6 +863,42 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
return nil
}
// KubernetesV2SDConfig is the configuration for Kubernetes service discovery.
type KubernetesV2SDConfig struct {
APIServer URL `yaml:"api_server"`
Role KubernetesRole `yaml:"role"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *KubernetesV2SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = KubernetesV2SDConfig{}
type plain KubernetesV2SDConfig
err := unmarshal((*plain)(c))
if err != nil {
return err
}
if err := checkOverflow(c.XXX, "kubernetes_sd_config"); err != nil {
return err
}
if c.Role == "" {
return fmt.Errorf("role missing (one of: container, pod, service, endpoint, node, apiserver)")
}
if len(c.BearerToken) > 0 && len(c.BearerTokenFile) > 0 {
return fmt.Errorf("at most one of bearer_token & bearer_token_file must be configured")
}
if c.BasicAuth != nil && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) {
return fmt.Errorf("at most one of basic_auth, bearer_token & bearer_token_file must be configured")
}
return nil
}
// GCESDConfig is the configuration for GCE based service discovery.
type GCESDConfig struct {
// Project: The Google Cloud Project ID

View file

@ -16,10 +16,12 @@ package discovery
import (
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/consul"
"github.com/prometheus/prometheus/retrieval/discovery/dns"
"github.com/prometheus/prometheus/retrieval/discovery/kubernetes"
"github.com/prometheus/prometheus/retrieval/discovery/kubernetes_v2"
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
)
@ -40,6 +42,11 @@ func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discov
return kd, nil
}
// NewKubernetesV2Discovery creates a Kubernetes service discovery based on the passed-in configuration.
func NewKubernetesV2Discovery(conf *config.KubernetesV2SDConfig) (*kubernetesv2.Kubernetes, error) {
return kubernetesv2.New(log.Base(), conf)
}
// NewMarathon creates a new Marathon based discovery.
func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery {
return &marathon.Discovery{

View file

@ -0,0 +1,121 @@
package kubernetesv2
import (
"io/ioutil"
"time"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
kubernetes "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
rest "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/util/runtime"
)
const (
// kubernetesMetaLabelPrefix is the meta prefix used for all meta labels.
// in this discovery.
metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_"
namespaceLabel = metaLabelPrefix + "namespace"
)
// Kubernetes implements the TargetProvider interface for discovering
// targets from Kubernetes.
type Kubernetes struct {
client kubernetes.Interface
role config.KubernetesRole
logger log.Logger
}
func init() {
runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) {
log.With("component", "kube_client_runtime").Errorln(err)
})
}
// New creates a new Kubernetes discovery for the given role.
func New(l log.Logger, conf *config.KubernetesV2SDConfig) (*Kubernetes, error) {
var (
kcfg *rest.Config
err error
)
if conf.APIServer.String() == "" {
kcfg, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
} else {
token := conf.BearerToken
if conf.BearerTokenFile != "" {
bf, err := ioutil.ReadFile(conf.BearerTokenFile)
if err != nil {
return nil, err
}
token = string(bf)
}
kcfg = &rest.Config{
Host: conf.APIServer.String(),
BearerToken: token,
TLSClientConfig: rest.TLSClientConfig{
CAFile: conf.TLSConfig.CAFile,
},
}
}
kcfg.UserAgent = "prometheus/discovery"
if conf.BasicAuth != nil {
kcfg.Username = conf.BasicAuth.Username
kcfg.Password = conf.BasicAuth.Password
}
kcfg.TLSClientConfig.CertFile = conf.TLSConfig.CertFile
kcfg.TLSClientConfig.KeyFile = conf.TLSConfig.KeyFile
kcfg.Insecure = conf.TLSConfig.InsecureSkipVerify
c, err := kubernetes.NewForConfig(kcfg)
if err != nil {
return nil, err
}
return &Kubernetes{
client: c,
logger: l,
role: conf.Role,
}, nil
}
const resyncPeriod = 10 * time.Minute
// Run implements the TargetProvider interface.
func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch)
rclient := k.client.Core().GetRESTClient()
switch k.role {
case "pod":
plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil)
pod := NewPods(
k.logger.With("kubernetes_sd", "pod"),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
go pod.informer.Run(ctx.Done())
for !pod.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
pod.Run(ctx, ch)
default:
k.logger.Errorf("unknown Kubernetes discovery kind %q", k.role)
}
<-ctx.Done()
}
func lv(s string) model.LabelValue {
return model.LabelValue(s)
}

View file

@ -0,0 +1,139 @@
package kubernetesv2
import (
"fmt"
"net"
"strconv"
"strings"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
)
// Pods discovers new pod targets.
type Pod struct {
informer cache.SharedInformer
store cache.Store
logger log.Logger
}
// NewPods creates a new pod discovery.
func NewPods(l log.Logger, pods cache.SharedInformer) *Pod {
return &Pod{
informer: pods,
store: pods.GetStore(),
logger: l,
}
}
// Run implements the TargetProvider interface.
func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send full initial set of pod targets.
var initial []*config.TargetGroup
for _, o := range p.store.List() {
tg := p.buildPod(o.(*apiv1.Pod))
initial = append(initial, tg)
p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("initial pod")
}
select {
case <-ctx.Done():
return
case ch <- initial:
}
// Send target groups for pod updates.
send := func(tg *config.TargetGroup) {
p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("pod update")
select {
case <-ctx.Done():
case ch <- []*config.TargetGroup{tg}:
}
}
p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
send(p.buildPod(o.(*apiv1.Pod)))
},
DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: podSource(o.(*apiv1.Pod))})
},
UpdateFunc: func(_, o interface{}) {
send(p.buildPod(o.(*apiv1.Pod)))
},
})
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
const (
podNameLabel = metaLabelPrefix + "pod_name"
podAddressLabel = metaLabelPrefix + "pod_address"
podContainerNameLabel = metaLabelPrefix + "pod_container_name"
podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name"
podContainerPortProtocolLabel = metaLabelPrefix + "pod_container_port_protocol"
podReadyLabel = metaLabelPrefix + "pod_ready"
podLabelPrefix = metaLabelPrefix + "pod_label_"
podAnnotationPrefix = metaLabelPrefix + "pod_annotation_"
podNodeNameLabel = metaLabelPrefix + "pod_node_name"
podHostIPLabel = metaLabelPrefix + "pod_host_ip"
)
func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup {
tg := &config.TargetGroup{
Source: podSource(pod),
}
tg.Labels = model.LabelSet{
namespaceLabel: lv(pod.Namespace),
podNameLabel: lv(pod.ObjectMeta.Name),
podAddressLabel: lv(pod.Status.PodIP),
podReadyLabel: podReady(pod),
podNodeNameLabel: lv(pod.Spec.NodeName),
podHostIPLabel: lv(pod.Status.HostIP),
}
for _, c := range pod.Spec.Containers {
// If no ports are defined for the container, create an anonymous
// target per container.
if len(c.Ports) == 0 {
// We don't have a port so we just set the address label to the pod IP.
// The user has to add a port manually.
tg.Targets = append(tg.Targets, model.LabelSet{
model.AddressLabel: lv(pod.Status.PodIP),
podContainerNameLabel: lv(c.Name),
})
continue
}
// Otherwise create one target for each container/port combination.
for _, port := range c.Ports {
addr := net.JoinHostPort(pod.Status.PodIP, strconv.FormatInt(int64(port.ContainerPort), 10))
tg.Targets = append(tg.Targets, model.LabelSet{
model.AddressLabel: lv(addr),
podContainerNameLabel: lv(c.Name),
podContainerPortNameLabel: lv(port.Name),
podContainerPortProtocolLabel: lv(string(port.Protocol)),
})
}
}
return tg
}
func podSource(pod *apiv1.Pod) string {
return "pod/" + pod.Namespace + "/" + pod.Name
}
func podReady(pod *apiv1.Pod) model.LabelValue {
for _, cond := range pod.Status.Conditions {
if cond.Type == apiv1.PodReady {
return lv(strings.ToLower(string(cond.Status)))
}
}
return lv(strings.ToLower(string(api.ConditionUnknown)))
}

View file

@ -399,6 +399,14 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
}
app("kubernetes", i, k)
}
for i, c := range cfg.KubernetesV2SDConfigs {
k, err := discovery.NewKubernetesV2Discovery(c)
if err != nil {
log.Errorf("Cannot create Kubernetes discovery: %s", err)
continue
}
app("kubernetes", i, k)
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, discovery.NewServersetDiscovery(c))
}