mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-27 13:42:31 -08:00
a10dc9298e
Sidecar containers are a newish feature in k8s. They're implemented similar to init containers but actually stay running and allow you to delay startup of your application pod until the sidecar started (like init containers always do). This adds the ports of the sidecar container to the list of discovered endpoint(slice), allowing you to target those containers as well. The implementation is a copy of that of Pod discovery fixes: #14927 Signed-off-by: bas smit <bsmit@bol.com>
511 lines
15 KiB
Go
511 lines
15 KiB
Go
// Copyright 2016 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package kubernetes
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"strconv"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/model"
|
|
apiv1 "k8s.io/api/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
)
|
|
|
|
// Endpoints discovers new endpoint targets.
|
|
type Endpoints struct {
|
|
logger log.Logger
|
|
|
|
endpointsInf cache.SharedIndexInformer
|
|
serviceInf cache.SharedInformer
|
|
podInf cache.SharedInformer
|
|
nodeInf cache.SharedInformer
|
|
withNodeMetadata bool
|
|
|
|
podStore cache.Store
|
|
endpointsStore cache.Store
|
|
serviceStore cache.Store
|
|
|
|
queue *workqueue.Type
|
|
}
|
|
|
|
// NewEndpoints returns a new endpoints discovery.
|
|
func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer, eventCount *prometheus.CounterVec) *Endpoints {
|
|
if l == nil {
|
|
l = log.NewNopLogger()
|
|
}
|
|
|
|
epAddCount := eventCount.WithLabelValues(RoleEndpoint.String(), MetricLabelRoleAdd)
|
|
epUpdateCount := eventCount.WithLabelValues(RoleEndpoint.String(), MetricLabelRoleUpdate)
|
|
epDeleteCount := eventCount.WithLabelValues(RoleEndpoint.String(), MetricLabelRoleDelete)
|
|
|
|
svcAddCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleAdd)
|
|
svcUpdateCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleUpdate)
|
|
svcDeleteCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleDelete)
|
|
|
|
podUpdateCount := eventCount.WithLabelValues(RolePod.String(), MetricLabelRoleUpdate)
|
|
|
|
e := &Endpoints{
|
|
logger: l,
|
|
endpointsInf: eps,
|
|
endpointsStore: eps.GetStore(),
|
|
serviceInf: svc,
|
|
serviceStore: svc.GetStore(),
|
|
podInf: pod,
|
|
podStore: pod.GetStore(),
|
|
nodeInf: node,
|
|
withNodeMetadata: node != nil,
|
|
queue: workqueue.NewNamed(RoleEndpoint.String()),
|
|
}
|
|
|
|
_, err := e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(o interface{}) {
|
|
epAddCount.Inc()
|
|
e.enqueue(o)
|
|
},
|
|
UpdateFunc: func(_, o interface{}) {
|
|
epUpdateCount.Inc()
|
|
e.enqueue(o)
|
|
},
|
|
DeleteFunc: func(o interface{}) {
|
|
epDeleteCount.Inc()
|
|
e.enqueue(o)
|
|
},
|
|
})
|
|
if err != nil {
|
|
level.Error(l).Log("msg", "Error adding endpoints event handler.", "err", err)
|
|
}
|
|
|
|
serviceUpdate := func(o interface{}) {
|
|
svc, err := convertToService(o)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "converting to Service object failed", "err", err)
|
|
return
|
|
}
|
|
|
|
ep := &apiv1.Endpoints{}
|
|
ep.Namespace = svc.Namespace
|
|
ep.Name = svc.Name
|
|
obj, exists, err := e.endpointsStore.Get(ep)
|
|
if exists && err == nil {
|
|
e.enqueue(obj.(*apiv1.Endpoints))
|
|
}
|
|
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "retrieving endpoints failed", "err", err)
|
|
}
|
|
}
|
|
_, err = e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
// TODO(fabxc): potentially remove add and delete event handlers. Those should
|
|
// be triggered via the endpoint handlers already.
|
|
AddFunc: func(o interface{}) {
|
|
svcAddCount.Inc()
|
|
serviceUpdate(o)
|
|
},
|
|
UpdateFunc: func(_, o interface{}) {
|
|
svcUpdateCount.Inc()
|
|
serviceUpdate(o)
|
|
},
|
|
DeleteFunc: func(o interface{}) {
|
|
svcDeleteCount.Inc()
|
|
serviceUpdate(o)
|
|
},
|
|
})
|
|
if err != nil {
|
|
level.Error(l).Log("msg", "Error adding services event handler.", "err", err)
|
|
}
|
|
_, err = e.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
UpdateFunc: func(old, cur interface{}) {
|
|
podUpdateCount.Inc()
|
|
oldPod, ok := old.(*apiv1.Pod)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
curPod, ok := cur.(*apiv1.Pod)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// the Pod's phase may change without triggering an update on the Endpoints/Service.
|
|
// https://github.com/prometheus/prometheus/issues/11305.
|
|
if curPod.Status.Phase != oldPod.Status.Phase {
|
|
e.enqueuePod(namespacedName(curPod.Namespace, curPod.Name))
|
|
}
|
|
},
|
|
})
|
|
if err != nil {
|
|
level.Error(l).Log("msg", "Error adding pods event handler.", "err", err)
|
|
}
|
|
if e.withNodeMetadata {
|
|
_, err = e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(o interface{}) {
|
|
node := o.(*apiv1.Node)
|
|
e.enqueueNode(node.Name)
|
|
},
|
|
UpdateFunc: func(_, o interface{}) {
|
|
node := o.(*apiv1.Node)
|
|
e.enqueueNode(node.Name)
|
|
},
|
|
DeleteFunc: func(o interface{}) {
|
|
node := o.(*apiv1.Node)
|
|
e.enqueueNode(node.Name)
|
|
},
|
|
})
|
|
if err != nil {
|
|
level.Error(l).Log("msg", "Error adding nodes event handler.", "err", err)
|
|
}
|
|
}
|
|
|
|
return e
|
|
}
|
|
|
|
func (e *Endpoints) enqueueNode(nodeName string) {
|
|
endpoints, err := e.endpointsInf.GetIndexer().ByIndex(nodeIndex, nodeName)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err)
|
|
return
|
|
}
|
|
|
|
for _, endpoint := range endpoints {
|
|
e.enqueue(endpoint)
|
|
}
|
|
}
|
|
|
|
func (e *Endpoints) enqueuePod(podNamespacedName string) {
|
|
endpoints, err := e.endpointsInf.GetIndexer().ByIndex(podIndex, podNamespacedName)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "Error getting endpoints for pod", "pod", podNamespacedName, "err", err)
|
|
return
|
|
}
|
|
|
|
for _, endpoint := range endpoints {
|
|
e.enqueue(endpoint)
|
|
}
|
|
}
|
|
|
|
func (e *Endpoints) enqueue(obj interface{}) {
|
|
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
e.queue.Add(key)
|
|
}
|
|
|
|
// Run implements the Discoverer interface.
|
|
func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
|
defer e.queue.ShutDown()
|
|
|
|
cacheSyncs := []cache.InformerSynced{e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
|
|
if e.withNodeMetadata {
|
|
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
|
|
}
|
|
|
|
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
|
|
if !errors.Is(ctx.Err(), context.Canceled) {
|
|
level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache")
|
|
}
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
for e.process(ctx, ch) {
|
|
}
|
|
}()
|
|
|
|
// Block until the target provider is explicitly canceled.
|
|
<-ctx.Done()
|
|
}
|
|
|
|
func (e *Endpoints) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
|
|
keyObj, quit := e.queue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
defer e.queue.Done(keyObj)
|
|
key := keyObj.(string)
|
|
|
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "splitting key failed", "key", key)
|
|
return true
|
|
}
|
|
|
|
o, exists, err := e.endpointsStore.GetByKey(key)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "getting object from store failed", "key", key)
|
|
return true
|
|
}
|
|
if !exists {
|
|
send(ctx, ch, &targetgroup.Group{Source: endpointsSourceFromNamespaceAndName(namespace, name)})
|
|
return true
|
|
}
|
|
eps, err := convertToEndpoints(o)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
|
|
return true
|
|
}
|
|
send(ctx, ch, e.buildEndpoints(eps))
|
|
return true
|
|
}
|
|
|
|
func convertToEndpoints(o interface{}) (*apiv1.Endpoints, error) {
|
|
endpoints, ok := o.(*apiv1.Endpoints)
|
|
if ok {
|
|
return endpoints, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("received unexpected object: %v", o)
|
|
}
|
|
|
|
func endpointsSource(ep *apiv1.Endpoints) string {
|
|
return endpointsSourceFromNamespaceAndName(ep.Namespace, ep.Name)
|
|
}
|
|
|
|
func endpointsSourceFromNamespaceAndName(namespace, name string) string {
|
|
return "endpoints/" + namespace + "/" + name
|
|
}
|
|
|
|
const (
|
|
endpointNodeName = metaLabelPrefix + "endpoint_node_name"
|
|
endpointHostname = metaLabelPrefix + "endpoint_hostname"
|
|
endpointReadyLabel = metaLabelPrefix + "endpoint_ready"
|
|
endpointPortNameLabel = metaLabelPrefix + "endpoint_port_name"
|
|
endpointPortProtocolLabel = metaLabelPrefix + "endpoint_port_protocol"
|
|
endpointAddressTargetKindLabel = metaLabelPrefix + "endpoint_address_target_kind"
|
|
endpointAddressTargetNameLabel = metaLabelPrefix + "endpoint_address_target_name"
|
|
)
|
|
|
|
func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *targetgroup.Group {
|
|
tg := &targetgroup.Group{
|
|
Source: endpointsSource(eps),
|
|
}
|
|
tg.Labels = model.LabelSet{
|
|
namespaceLabel: lv(eps.Namespace),
|
|
}
|
|
e.addServiceLabels(eps.Namespace, eps.Name, tg)
|
|
// Add endpoints labels metadata.
|
|
addObjectMetaLabels(tg.Labels, eps.ObjectMeta, RoleEndpoint)
|
|
|
|
type podEntry struct {
|
|
pod *apiv1.Pod
|
|
servicePorts []apiv1.EndpointPort
|
|
}
|
|
seenPods := map[string]*podEntry{}
|
|
|
|
add := func(addr apiv1.EndpointAddress, port apiv1.EndpointPort, ready string) {
|
|
a := net.JoinHostPort(addr.IP, strconv.FormatUint(uint64(port.Port), 10))
|
|
|
|
target := model.LabelSet{
|
|
model.AddressLabel: lv(a),
|
|
endpointPortNameLabel: lv(port.Name),
|
|
endpointPortProtocolLabel: lv(string(port.Protocol)),
|
|
endpointReadyLabel: lv(ready),
|
|
}
|
|
|
|
if addr.TargetRef != nil {
|
|
target[model.LabelName(endpointAddressTargetKindLabel)] = lv(addr.TargetRef.Kind)
|
|
target[model.LabelName(endpointAddressTargetNameLabel)] = lv(addr.TargetRef.Name)
|
|
}
|
|
|
|
if addr.NodeName != nil {
|
|
target[model.LabelName(endpointNodeName)] = lv(*addr.NodeName)
|
|
}
|
|
if addr.Hostname != "" {
|
|
target[model.LabelName(endpointHostname)] = lv(addr.Hostname)
|
|
}
|
|
|
|
if e.withNodeMetadata {
|
|
if addr.NodeName != nil {
|
|
target = addNodeLabels(target, e.nodeInf, e.logger, addr.NodeName)
|
|
} else if addr.TargetRef != nil && addr.TargetRef.Kind == "Node" {
|
|
target = addNodeLabels(target, e.nodeInf, e.logger, &addr.TargetRef.Name)
|
|
}
|
|
}
|
|
|
|
pod := e.resolvePodRef(addr.TargetRef)
|
|
if pod == nil {
|
|
// This target is not a Pod, so don't continue with Pod specific logic.
|
|
tg.Targets = append(tg.Targets, target)
|
|
return
|
|
}
|
|
s := namespacedName(pod.Namespace, pod.Name)
|
|
|
|
sp, ok := seenPods[s]
|
|
if !ok {
|
|
sp = &podEntry{pod: pod}
|
|
seenPods[s] = sp
|
|
}
|
|
|
|
// Attach standard pod labels.
|
|
target = target.Merge(podLabels(pod))
|
|
|
|
// Attach potential container port labels matching the endpoint port.
|
|
containers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
|
|
for i, c := range containers {
|
|
for _, cport := range c.Ports {
|
|
if port.Port == cport.ContainerPort {
|
|
ports := strconv.FormatUint(uint64(port.Port), 10)
|
|
isInit := i >= len(pod.Spec.Containers)
|
|
|
|
target[podContainerNameLabel] = lv(c.Name)
|
|
target[podContainerImageLabel] = lv(c.Image)
|
|
target[podContainerPortNameLabel] = lv(cport.Name)
|
|
target[podContainerPortNumberLabel] = lv(ports)
|
|
target[podContainerPortProtocolLabel] = lv(string(port.Protocol))
|
|
target[podContainerIsInit] = lv(strconv.FormatBool(isInit))
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add service port so we know that we have already generated a target
|
|
// for it.
|
|
sp.servicePorts = append(sp.servicePorts, port)
|
|
tg.Targets = append(tg.Targets, target)
|
|
}
|
|
|
|
for _, ss := range eps.Subsets {
|
|
for _, port := range ss.Ports {
|
|
for _, addr := range ss.Addresses {
|
|
add(addr, port, "true")
|
|
}
|
|
// Although this generates the same target again, as it was generated in
|
|
// the loop above, it causes the ready meta label to be overridden.
|
|
for _, addr := range ss.NotReadyAddresses {
|
|
add(addr, port, "false")
|
|
}
|
|
}
|
|
}
|
|
|
|
v := eps.Labels[apiv1.EndpointsOverCapacity]
|
|
if v == "truncated" {
|
|
level.Warn(e.logger).Log("msg", "Number of endpoints in one Endpoints object exceeds 1000 and has been truncated, please use \"role: endpointslice\" instead", "endpoint", eps.Name)
|
|
}
|
|
if v == "warning" {
|
|
level.Warn(e.logger).Log("msg", "Number of endpoints in one Endpoints object exceeds 1000, please use \"role: endpointslice\" instead", "endpoint", eps.Name)
|
|
}
|
|
|
|
// For all seen pods, check all container ports. If they were not covered
|
|
// by one of the service endpoints, generate targets for them.
|
|
for _, pe := range seenPods {
|
|
// PodIP can be empty when a pod is starting or has been evicted.
|
|
if len(pe.pod.Status.PodIP) == 0 {
|
|
continue
|
|
}
|
|
|
|
containers := append(pe.pod.Spec.Containers, pe.pod.Spec.InitContainers...)
|
|
for i, c := range containers {
|
|
for _, cport := range c.Ports {
|
|
hasSeenPort := func() bool {
|
|
for _, eport := range pe.servicePorts {
|
|
if cport.ContainerPort == eport.Port {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
if hasSeenPort() {
|
|
continue
|
|
}
|
|
|
|
a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatUint(uint64(cport.ContainerPort), 10))
|
|
ports := strconv.FormatUint(uint64(cport.ContainerPort), 10)
|
|
|
|
isInit := i >= len(pe.pod.Spec.Containers)
|
|
target := model.LabelSet{
|
|
model.AddressLabel: lv(a),
|
|
podContainerNameLabel: lv(c.Name),
|
|
podContainerImageLabel: lv(c.Image),
|
|
podContainerPortNameLabel: lv(cport.Name),
|
|
podContainerPortNumberLabel: lv(ports),
|
|
podContainerPortProtocolLabel: lv(string(cport.Protocol)),
|
|
podContainerIsInit: lv(strconv.FormatBool(isInit)),
|
|
}
|
|
tg.Targets = append(tg.Targets, target.Merge(podLabels(pe.pod)))
|
|
}
|
|
}
|
|
}
|
|
|
|
return tg
|
|
}
|
|
|
|
func (e *Endpoints) resolvePodRef(ref *apiv1.ObjectReference) *apiv1.Pod {
|
|
if ref == nil || ref.Kind != "Pod" {
|
|
return nil
|
|
}
|
|
p := &apiv1.Pod{}
|
|
p.Namespace = ref.Namespace
|
|
p.Name = ref.Name
|
|
|
|
obj, exists, err := e.podStore.Get(p)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "resolving pod ref failed", "err", err)
|
|
return nil
|
|
}
|
|
if !exists {
|
|
return nil
|
|
}
|
|
return obj.(*apiv1.Pod)
|
|
}
|
|
|
|
func (e *Endpoints) addServiceLabels(ns, name string, tg *targetgroup.Group) {
|
|
svc := &apiv1.Service{}
|
|
svc.Namespace = ns
|
|
svc.Name = name
|
|
|
|
obj, exists, err := e.serviceStore.Get(svc)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "retrieving service failed", "err", err)
|
|
return
|
|
}
|
|
if !exists {
|
|
return
|
|
}
|
|
svc = obj.(*apiv1.Service)
|
|
|
|
tg.Labels = tg.Labels.Merge(serviceLabels(svc))
|
|
}
|
|
|
|
func addNodeLabels(tg model.LabelSet, nodeInf cache.SharedInformer, logger log.Logger, nodeName *string) model.LabelSet {
|
|
if nodeName == nil {
|
|
return tg
|
|
}
|
|
|
|
obj, exists, err := nodeInf.GetStore().GetByKey(*nodeName)
|
|
if err != nil {
|
|
level.Error(logger).Log("msg", "Error getting node", "node", *nodeName, "err", err)
|
|
return tg
|
|
}
|
|
|
|
if !exists {
|
|
return tg
|
|
}
|
|
|
|
node := obj.(*apiv1.Node)
|
|
// Allocate one target label for the node name,
|
|
nodeLabelset := make(model.LabelSet)
|
|
addObjectMetaLabels(nodeLabelset, node.ObjectMeta, RoleNode)
|
|
return tg.Merge(nodeLabelset)
|
|
}
|