diff --git a/config/config.go b/config/config.go index a340e0fed..454d10bd1 100644 --- a/config/config.go +++ b/config/config.go @@ -442,6 +442,8 @@ type ScrapeConfig struct { MarathonSDConfigs []*MarathonSDConfig `yaml:"marathon_sd_configs,omitempty"` // List of Kubernetes service discovery configurations. KubernetesSDConfigs []*KubernetesSDConfig `yaml:"kubernetes_sd_configs,omitempty"` + // List of Kubernetes service discovery configurations. + KubernetesV2SDConfigs []*KubernetesV2SDConfig `yaml:"kubernetes_v2_sd_configs,omitempty"` // List of GCE service discovery configurations. GCESDConfigs []*GCESDConfig `yaml:"gce_sd_configs,omitempty"` // List of EC2 service discovery configurations. @@ -861,6 +863,42 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er return nil } +// KubernetesV2SDConfig is the configuration for Kubernetes service discovery. +type KubernetesV2SDConfig struct { + APIServer URL `yaml:"api_server"` + Role KubernetesRole `yaml:"role"` + BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` + BearerToken string `yaml:"bearer_token,omitempty"` + BearerTokenFile string `yaml:"bearer_token_file,omitempty"` + TLSConfig TLSConfig `yaml:"tls_config,omitempty"` + + // Catches all undefined fields and must be empty after parsing. + XXX map[string]interface{} `yaml:",inline"` +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *KubernetesV2SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = KubernetesV2SDConfig{} + type plain KubernetesV2SDConfig + err := unmarshal((*plain)(c)) + if err != nil { + return err + } + if err := checkOverflow(c.XXX, "kubernetes_sd_config"); err != nil { + return err + } + if c.Role == "" { + return fmt.Errorf("role missing (one of: container, pod, service, endpoint, node, apiserver)") + } + if len(c.BearerToken) > 0 && len(c.BearerTokenFile) > 0 { + return fmt.Errorf("at most one of bearer_token & bearer_token_file must be configured") + } + if c.BasicAuth != nil && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) { + return fmt.Errorf("at most one of basic_auth, bearer_token & bearer_token_file must be configured") + } + return nil +} + // GCESDConfig is the configuration for GCE based service discovery. type GCESDConfig struct { // Project: The Google Cloud Project ID diff --git a/retrieval/discovery/discovery.go b/retrieval/discovery/discovery.go index 795de8158..6f22320b6 100644 --- a/retrieval/discovery/discovery.go +++ b/retrieval/discovery/discovery.go @@ -16,10 +16,12 @@ package discovery import ( "time" + "github.com/prometheus/common/log" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/retrieval/discovery/consul" "github.com/prometheus/prometheus/retrieval/discovery/dns" "github.com/prometheus/prometheus/retrieval/discovery/kubernetes" + "github.com/prometheus/prometheus/retrieval/discovery/kubernetes_v2" "github.com/prometheus/prometheus/retrieval/discovery/marathon" ) @@ -40,6 +42,11 @@ func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discov return kd, nil } +// NewKubernetesV2Discovery creates a Kubernetes service discovery based on the passed-in configuration. +func NewKubernetesV2Discovery(conf *config.KubernetesV2SDConfig) (*kubernetesv2.Kubernetes, error) { + return kubernetesv2.New(log.Base(), conf) +} + // NewMarathon creates a new Marathon based discovery. func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery { return &marathon.Discovery{ diff --git a/retrieval/discovery/kubernetes_v2/kubernetes.go b/retrieval/discovery/kubernetes_v2/kubernetes.go new file mode 100644 index 000000000..5a32411cf --- /dev/null +++ b/retrieval/discovery/kubernetes_v2/kubernetes.go @@ -0,0 +1,121 @@ +package kubernetesv2 + +import ( + "io/ioutil" + "time" + + "github.com/prometheus/prometheus/config" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/api" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/cache" + kubernetes "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + rest "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/util/runtime" +) + +const ( + // kubernetesMetaLabelPrefix is the meta prefix used for all meta labels. + // in this discovery. + metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_" + namespaceLabel = metaLabelPrefix + "namespace" +) + +// Kubernetes implements the TargetProvider interface for discovering +// targets from Kubernetes. +type Kubernetes struct { + client kubernetes.Interface + role config.KubernetesRole + logger log.Logger +} + +func init() { + runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) { + log.With("component", "kube_client_runtime").Errorln(err) + }) +} + +// New creates a new Kubernetes discovery for the given role. +func New(l log.Logger, conf *config.KubernetesV2SDConfig) (*Kubernetes, error) { + var ( + kcfg *rest.Config + err error + ) + if conf.APIServer.String() == "" { + kcfg, err = rest.InClusterConfig() + if err != nil { + return nil, err + } + } else { + token := conf.BearerToken + if conf.BearerTokenFile != "" { + bf, err := ioutil.ReadFile(conf.BearerTokenFile) + if err != nil { + return nil, err + } + token = string(bf) + } + + kcfg = &rest.Config{ + Host: conf.APIServer.String(), + BearerToken: token, + TLSClientConfig: rest.TLSClientConfig{ + CAFile: conf.TLSConfig.CAFile, + }, + } + } + kcfg.UserAgent = "prometheus/discovery" + + if conf.BasicAuth != nil { + kcfg.Username = conf.BasicAuth.Username + kcfg.Password = conf.BasicAuth.Password + } + kcfg.TLSClientConfig.CertFile = conf.TLSConfig.CertFile + kcfg.TLSClientConfig.KeyFile = conf.TLSConfig.KeyFile + kcfg.Insecure = conf.TLSConfig.InsecureSkipVerify + + c, err := kubernetes.NewForConfig(kcfg) + if err != nil { + return nil, err + } + return &Kubernetes{ + client: c, + logger: l, + role: conf.Role, + }, nil +} + +const resyncPeriod = 10 * time.Minute + +// Run implements the TargetProvider interface. +func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + defer close(ch) + + rclient := k.client.Core().GetRESTClient() + + switch k.role { + case "pod": + plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil) + pod := NewPods( + k.logger.With("kubernetes_sd", "pod"), + cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), + ) + go pod.informer.Run(ctx.Done()) + + for !pod.informer.HasSynced() { + time.Sleep(100 * time.Millisecond) + } + pod.Run(ctx, ch) + default: + k.logger.Errorf("unknown Kubernetes discovery kind %q", k.role) + } + + <-ctx.Done() +} + +func lv(s string) model.LabelValue { + return model.LabelValue(s) +} diff --git a/retrieval/discovery/kubernetes_v2/pod.go b/retrieval/discovery/kubernetes_v2/pod.go new file mode 100644 index 000000000..d7a6ac63e --- /dev/null +++ b/retrieval/discovery/kubernetes_v2/pod.go @@ -0,0 +1,139 @@ +package kubernetesv2 + +import ( + "fmt" + "net" + "strconv" + "strings" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/api" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/cache" +) + +// Pods discovers new pod targets. +type Pod struct { + informer cache.SharedInformer + store cache.Store + logger log.Logger +} + +// NewPods creates a new pod discovery. +func NewPods(l log.Logger, pods cache.SharedInformer) *Pod { + return &Pod{ + informer: pods, + store: pods.GetStore(), + logger: l, + } +} + +// Run implements the TargetProvider interface. +func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // Send full initial set of pod targets. + var initial []*config.TargetGroup + for _, o := range p.store.List() { + tg := p.buildPod(o.(*apiv1.Pod)) + initial = append(initial, tg) + + p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("initial pod") + } + select { + case <-ctx.Done(): + return + case ch <- initial: + } + + // Send target groups for pod updates. + send := func(tg *config.TargetGroup) { + p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("pod update") + select { + case <-ctx.Done(): + case ch <- []*config.TargetGroup{tg}: + } + } + p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + send(p.buildPod(o.(*apiv1.Pod))) + }, + DeleteFunc: func(o interface{}) { + send(&config.TargetGroup{Source: podSource(o.(*apiv1.Pod))}) + }, + UpdateFunc: func(_, o interface{}) { + send(p.buildPod(o.(*apiv1.Pod))) + }, + }) + + // Block until the target provider is explicitly canceled. + <-ctx.Done() +} + +const ( + podNameLabel = metaLabelPrefix + "pod_name" + podAddressLabel = metaLabelPrefix + "pod_address" + podContainerNameLabel = metaLabelPrefix + "pod_container_name" + podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name" + podContainerPortProtocolLabel = metaLabelPrefix + "pod_container_port_protocol" + podReadyLabel = metaLabelPrefix + "pod_ready" + podLabelPrefix = metaLabelPrefix + "pod_label_" + podAnnotationPrefix = metaLabelPrefix + "pod_annotation_" + podNodeNameLabel = metaLabelPrefix + "pod_node_name" + podHostIPLabel = metaLabelPrefix + "pod_host_ip" +) + +func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup { + tg := &config.TargetGroup{ + Source: podSource(pod), + } + tg.Labels = model.LabelSet{ + namespaceLabel: lv(pod.Namespace), + podNameLabel: lv(pod.ObjectMeta.Name), + podAddressLabel: lv(pod.Status.PodIP), + podReadyLabel: podReady(pod), + podNodeNameLabel: lv(pod.Spec.NodeName), + podHostIPLabel: lv(pod.Status.HostIP), + } + + for _, c := range pod.Spec.Containers { + // If no ports are defined for the container, create an anonymous + // target per container. + if len(c.Ports) == 0 { + // We don't have a port so we just set the address label to the pod IP. + // The user has to add a port manually. + tg.Targets = append(tg.Targets, model.LabelSet{ + model.AddressLabel: lv(pod.Status.PodIP), + podContainerNameLabel: lv(c.Name), + }) + continue + } + // Otherwise create one target for each container/port combination. + for _, port := range c.Ports { + addr := net.JoinHostPort(pod.Status.PodIP, strconv.FormatInt(int64(port.ContainerPort), 10)) + + tg.Targets = append(tg.Targets, model.LabelSet{ + model.AddressLabel: lv(addr), + podContainerNameLabel: lv(c.Name), + podContainerPortNameLabel: lv(port.Name), + podContainerPortProtocolLabel: lv(string(port.Protocol)), + }) + } + } + + return tg +} + +func podSource(pod *apiv1.Pod) string { + return "pod/" + pod.Namespace + "/" + pod.Name +} + +func podReady(pod *apiv1.Pod) model.LabelValue { + for _, cond := range pod.Status.Conditions { + if cond.Type == apiv1.PodReady { + return lv(strings.ToLower(string(cond.Status))) + } + } + return lv(strings.ToLower(string(api.ConditionUnknown))) +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 30dfbe8ab..590e8f685 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -399,6 +399,14 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { } app("kubernetes", i, k) } + for i, c := range cfg.KubernetesV2SDConfigs { + k, err := discovery.NewKubernetesV2Discovery(c) + if err != nil { + log.Errorf("Cannot create Kubernetes discovery: %s", err) + continue + } + app("kubernetes", i, k) + } for i, c := range cfg.ServersetSDConfigs { app("serverset", i, discovery.NewServersetDiscovery(c)) }