kubernetes: infer pod information in endpoints discovery

This commit is contained in:
Fabian Reinartz 2016-09-30 14:18:49 +02:00
parent 7c439a9060
commit 6d269ed870
3 changed files with 143 additions and 38 deletions

View file

@ -32,11 +32,12 @@ type Endpoints struct {
logger log.Logger logger log.Logger
endpointsInf cache.SharedInformer endpointsInf cache.SharedInformer
servicesInf cache.SharedInformer serviceInf cache.SharedInformer
podsInf cache.SharedInformer podInf cache.SharedInformer
podStore cache.Store podStore cache.Store
endpointsStore cache.Store endpointsStore cache.Store
serviceStore cache.Store
} }
// NewEndpoints returns a new endpoints discovery. // NewEndpoints returns a new endpoints discovery.
@ -45,8 +46,9 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints {
logger: l, logger: l,
endpointsInf: eps, endpointsInf: eps,
endpointsStore: eps.GetStore(), endpointsStore: eps.GetStore(),
servicesInf: svc, serviceInf: svc,
podsInf: pod, serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(), podStore: pod.GetStore(),
} }
@ -99,8 +101,11 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
if exists && err != nil { if exists && err != nil {
send(e.buildEndpoints(obj.(*apiv1.Endpoints))) send(e.buildEndpoints(obj.(*apiv1.Endpoints)))
} }
if err != nil {
e.logger.With("err", err).Errorln("retrieving endpoints failed")
} }
e.servicesInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ }
e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, AddFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) },
UpdateFunc: func(_, o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, UpdateFunc: func(_, o interface{}) { serviceUpdate(o.(*apiv1.Service)) },
DeleteFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, DeleteFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) },
@ -122,6 +127,7 @@ const (
endpointsNameLabel = metaLabelPrefix + "endpoints_name" endpointsNameLabel = metaLabelPrefix + "endpoints_name"
endpointReadyLabel = metaLabelPrefix + "endpoint_ready" endpointReadyLabel = metaLabelPrefix + "endpoint_ready"
endpointPortNameLabel = metaLabelPrefix + "endpoint_port_name" endpointPortNameLabel = metaLabelPrefix + "endpoint_port_name"
endpointPortProtocolLabel = metaLabelPrefix + "endpoint_port_protocol"
) )
func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup { func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup {
@ -136,22 +142,56 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup {
namespaceLabel: lv(eps.Namespace), namespaceLabel: lv(eps.Namespace),
endpointsNameLabel: lv(eps.Name), endpointsNameLabel: lv(eps.Name),
} }
e.decorateService(eps.Namespace, eps.Name, tg) e.addServiceLabels(eps.Namespace, eps.Name, tg)
// type podEntry struct { type podEntry struct {
// pod *apiv1.Pod pod *apiv1.Pod
// servicePorts []apiv1.ServicePort servicePorts []apiv1.EndpointPort
// } }
// seenPods := map[string]podEntry{} seenPods := map[string]*podEntry{}
add := func(addr apiv1.EndpointAddress, port apiv1.EndpointPort, ready string) { add := func(addr apiv1.EndpointAddress, port apiv1.EndpointPort, ready string) {
a := net.JoinHostPort(addr.IP, strconv.FormatInt(int64(port.Port), 10)) a := net.JoinHostPort(addr.IP, strconv.FormatInt(int64(port.Port), 10))
tg.Targets = append(tg.Targets, model.LabelSet{ target := model.LabelSet{
model.AddressLabel: lv(a), model.AddressLabel: lv(a),
endpointPortNameLabel: lv(port.Name), endpointPortNameLabel: lv(port.Name),
endpointPortProtocolLabel: lv(string(port.Protocol)),
endpointReadyLabel: lv(ready), endpointReadyLabel: lv(ready),
}) }
pod := e.resolvePodRef(addr.TargetRef)
if pod == nil {
tg.Targets = append(tg.Targets, target)
return
}
s := pod.Namespace + "/" + pod.Name
sp, ok := seenPods[s]
if !ok {
sp = &podEntry{pod: pod}
seenPods[s] = sp
}
// Attach standard pod labels.
target = target.Merge(podLabels(pod))
// Attach potential container port labels matching the endpoint port.
for _, c := range pod.Spec.Containers {
for _, cport := range c.Ports {
if port.Port == cport.ContainerPort {
target[podContainerNameLabel] = lv(c.Name)
target[podContainerPortNameLabel] = lv(port.Name)
target[podContainerPortProtocolLabel] = lv(string(port.Protocol))
break
}
}
}
// Add service port so we know that we have already generated a target
// for it.
sp.servicePorts = append(sp.servicePorts, port)
tg.Targets = append(tg.Targets, target)
} }
for _, ss := range eps.Subsets { for _, ss := range eps.Subsets {
@ -165,29 +205,69 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup {
} }
} }
// For all seen pods, check all container ports. If they were not covered
// by one of the service endpoints, generate targets for them.
for _, pe := range seenPods {
for _, c := range pe.pod.Spec.Containers {
for _, cport := range c.Ports {
hasSeenPort := func() bool {
for _, eport := range pe.servicePorts {
if cport.ContainerPort == eport.Port {
return true
}
}
return false
}
if hasSeenPort() {
continue
}
a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatInt(int64(cport.ContainerPort), 10))
target := model.LabelSet{
model.AddressLabel: lv(a),
podContainerNameLabel: lv(c.Name),
podContainerPortNameLabel: lv(cport.Name),
podContainerPortProtocolLabel: lv(string(cport.Protocol)),
}
tg.Targets = append(tg.Targets, target.Merge(podLabels(pe.pod)))
}
}
}
return tg return tg
} }
func (e *Endpoints) resolvePodRef(ref *apiv1.ObjectReference) *apiv1.Pod { func (e *Endpoints) resolvePodRef(ref *apiv1.ObjectReference) *apiv1.Pod {
if ref.Kind != "Pod" { if ref == nil || ref.Kind != "Pod" {
return nil return nil
} }
p, exists, err := e.podStore.Get(ref) p := &apiv1.Pod{}
p.Namespace = ref.Namespace
p.Name = ref.Name
obj, exists, err := e.podStore.Get(p)
if err != nil || !exists { if err != nil || !exists {
return nil return nil
} }
return p.(*apiv1.Pod) if err != nil {
e.logger.With("err", err).Errorln("resolving pod ref failed")
}
return obj.(*apiv1.Pod)
} }
func (e *Endpoints) decorateService(ns, name string, tg *config.TargetGroup) { func (e *Endpoints) addServiceLabels(ns, name string, tg *config.TargetGroup) {
svc := &apiv1.Service{} svc := &apiv1.Service{}
svc.Namespace = ns svc.Namespace = ns
svc.Name = name svc.Name = name
obj, exists, err := e.servicesInf.GetStore().Get(svc) obj, exists, err := e.serviceStore.Get(svc)
if !exists || err != nil { if !exists || err != nil {
return return
} }
if err != nil {
e.logger.With("err", err).Errorln("retrieving service failed")
}
svc = obj.(*apiv1.Service) svc = obj.(*apiv1.Service)
tg.Labels[serviceNameLabel] = lv(svc.Name) tg.Labels[serviceNameLabel] = lv(svc.Name)

View file

@ -121,16 +121,16 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
) )
go eps.endpointsInf.Run(ctx.Done()) go eps.endpointsInf.Run(ctx.Done())
go eps.servicesInf.Run(ctx.Done()) go eps.serviceInf.Run(ctx.Done())
go eps.podsInf.Run(ctx.Done()) go eps.podInf.Run(ctx.Done())
for !eps.servicesInf.HasSynced() { for !eps.serviceInf.HasSynced() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
for !eps.endpointsInf.HasSynced() { for !eps.endpointsInf.HasSynced() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
for !eps.podsInf.HasSynced() { for !eps.podInf.HasSynced() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
eps.Run(ctx, ch) eps.Run(ctx, ch)

View file

@ -22,13 +22,14 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/client-go/1.5/pkg/api" "k8s.io/client-go/1.5/pkg/api"
apiv1 "k8s.io/client-go/1.5/pkg/api/v1" apiv1 "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache" "k8s.io/client-go/1.5/tools/cache"
) )
// Pods discovers new pod targets. // Pod discovers new pod targets.
type Pod struct { type Pod struct {
informer cache.SharedInformer informer cache.SharedInformer
store cache.Store store cache.Store
@ -86,9 +87,10 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
const ( const (
podNameLabel = metaLabelPrefix + "pod_name" podNameLabel = metaLabelPrefix + "pod_name"
podAddressLabel = metaLabelPrefix + "pod_address" podIPLabel = metaLabelPrefix + "pod_ip"
podContainerNameLabel = metaLabelPrefix + "pod_container_name" podContainerNameLabel = metaLabelPrefix + "pod_container_name"
podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name" podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name"
podContainerPortNumberLabel = metaLabelPrefix + "pod_container_port_number"
podContainerPortProtocolLabel = metaLabelPrefix + "pod_container_port_protocol" podContainerPortProtocolLabel = metaLabelPrefix + "pod_container_port_protocol"
podReadyLabel = metaLabelPrefix + "pod_ready" podReadyLabel = metaLabelPrefix + "pod_ready"
podLabelPrefix = metaLabelPrefix + "pod_label_" podLabelPrefix = metaLabelPrefix + "pod_label_"
@ -97,19 +99,40 @@ const (
podHostIPLabel = metaLabelPrefix + "pod_host_ip" podHostIPLabel = metaLabelPrefix + "pod_host_ip"
) )
func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup { func podLabels(pod *apiv1.Pod) model.LabelSet {
tg := &config.TargetGroup{ ls := model.LabelSet{
Source: podSource(pod),
}
tg.Labels = model.LabelSet{
namespaceLabel: lv(pod.Namespace),
podNameLabel: lv(pod.ObjectMeta.Name), podNameLabel: lv(pod.ObjectMeta.Name),
podAddressLabel: lv(pod.Status.PodIP), podIPLabel: lv(pod.Status.PodIP),
podReadyLabel: podReady(pod), podReadyLabel: podReady(pod),
podNodeNameLabel: lv(pod.Spec.NodeName), podNodeNameLabel: lv(pod.Spec.NodeName),
podHostIPLabel: lv(pod.Status.HostIP), podHostIPLabel: lv(pod.Status.HostIP),
} }
for k, v := range pod.Labels {
ln := strutil.SanitizeLabelName(serviceLabelPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
for k, v := range pod.Annotations {
ln := strutil.SanitizeLabelName(serviceAnnotationPrefix + k)
ls[model.LabelName(ln)] = lv(v)
}
return ls
}
func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup {
// During startup the pod may not have an IP yet. This does not even allow
// for an up metric, so we skip the target.
if len(pod.Status.PodIP) == 0 {
return nil
}
tg := &config.TargetGroup{
Source: podSource(pod),
}
tg.Labels = podLabels(pod)
tg.Labels[namespaceLabel] = lv(pod.Namespace)
for _, c := range pod.Spec.Containers { for _, c := range pod.Spec.Containers {
// If no ports are defined for the container, create an anonymous // If no ports are defined for the container, create an anonymous
// target per container. // target per container.
@ -124,11 +147,13 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup {
} }
// Otherwise create one target for each container/port combination. // Otherwise create one target for each container/port combination.
for _, port := range c.Ports { for _, port := range c.Ports {
addr := net.JoinHostPort(pod.Status.PodIP, strconv.FormatInt(int64(port.ContainerPort), 10)) ports := strconv.FormatInt(int64(port.ContainerPort), 10)
addr := net.JoinHostPort(pod.Status.PodIP, ports)
tg.Targets = append(tg.Targets, model.LabelSet{ tg.Targets = append(tg.Targets, model.LabelSet{
model.AddressLabel: lv(addr), model.AddressLabel: lv(addr),
podContainerNameLabel: lv(c.Name), podContainerNameLabel: lv(c.Name),
podContainerPortNumberLabel: lv(ports),
podContainerPortNameLabel: lv(port.Name), podContainerPortNameLabel: lv(port.Name),
podContainerPortProtocolLabel: lv(string(port.Protocol)), podContainerPortProtocolLabel: lv(string(port.Protocol)),
}) })