prometheus/discovery/kubernetes/endpointslice.go
Julien Pivotto 3677d61a4b Update kubernetes dependencies
A new API is available for AddEventHandlers, to get errors but also be
able to cancel handlers.

Doing the easy thing for the release, which is just to log errors.

We could see how to improve this in the future to handle the errors
properly and cancel the handlers.

Signed-off-by: Julien Pivotto <roidelapluie@o11y.eu>
2022-12-14 10:46:20 +01:00

469 lines
14 KiB
Go

// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"fmt"
"net"
"strconv"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/discovery/v1"
"k8s.io/api/discovery/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil"
)
var (
epslAddCount = eventCount.WithLabelValues("endpointslice", "add")
epslUpdateCount = eventCount.WithLabelValues("endpointslice", "update")
epslDeleteCount = eventCount.WithLabelValues("endpointslice", "delete")
)
// EndpointSlice discovers new endpoint targets.
type EndpointSlice struct {
logger log.Logger
endpointSliceInf cache.SharedIndexInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
podStore cache.Store
endpointSliceStore cache.Store
serviceStore cache.Store
queue *workqueue.Type
}
// NewEndpointSlice returns a new endpointslice discovery.
func NewEndpointSlice(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *EndpointSlice {
if l == nil {
l = log.NewNopLogger()
}
e := &EndpointSlice{
logger: l,
endpointSliceInf: eps,
endpointSliceStore: eps.GetStore(),
serviceInf: svc,
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
queue: workqueue.NewNamed("endpointSlice"),
}
_, err := e.endpointSliceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
epslAddCount.Inc()
e.enqueue(o)
},
UpdateFunc: func(_, o interface{}) {
epslUpdateCount.Inc()
e.enqueue(o)
},
DeleteFunc: func(o interface{}) {
epslDeleteCount.Inc()
e.enqueue(o)
},
})
if err != nil {
level.Error(l).Log("msg", "Error adding endpoint slices event handler.", "err", err)
}
serviceUpdate := func(o interface{}) {
svc, err := convertToService(o)
if err != nil {
level.Error(e.logger).Log("msg", "converting to Service object failed", "err", err)
return
}
// TODO(brancz): use cache.Indexer to index endpoints by
// disv1beta1.LabelServiceName so this operation doesn't have to
// iterate over all endpoint objects.
for _, obj := range e.endpointSliceStore.List() {
esa, err := e.getEndpointSliceAdaptor(obj)
if err != nil {
level.Error(e.logger).Log("msg", "converting to EndpointSlice object failed", "err", err)
continue
}
if lv, exists := esa.labels()[esa.labelServiceName()]; exists && lv == svc.Name {
e.enqueue(esa.get())
}
}
}
_, err = e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
svcAddCount.Inc()
serviceUpdate(o)
},
UpdateFunc: func(_, o interface{}) {
svcUpdateCount.Inc()
serviceUpdate(o)
},
DeleteFunc: func(o interface{}) {
svcDeleteCount.Inc()
serviceUpdate(o)
},
})
if err != nil {
level.Error(l).Log("msg", "Error adding services event handler.", "err", err)
}
if e.withNodeMetadata {
_, err = e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
UpdateFunc: func(_, o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
})
if err != nil {
level.Error(l).Log("msg", "Error adding nodes event handler.", "err", err)
}
}
return e
}
func (e *EndpointSlice) enqueueNode(nodeName string) {
endpoints, err := e.endpointSliceInf.GetIndexer().ByIndex(nodeIndex, nodeName)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err)
return
}
for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}
func (e *EndpointSlice) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
e.queue.Add(key)
}
// Run implements the Discoverer interface.
func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer e.queue.ShutDown()
cacheSyncs := []cache.InformerSynced{e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if ctx.Err() != context.Canceled {
level.Error(e.logger).Log("msg", "endpointslice informer unable to sync cache")
}
return
}
go func() {
for e.process(ctx, ch) {
}
}()
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
func (e *EndpointSlice) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := e.queue.Get()
if quit {
return false
}
defer e.queue.Done(keyObj)
key := keyObj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
level.Error(e.logger).Log("msg", "splitting key failed", "key", key)
return true
}
o, exists, err := e.endpointSliceStore.GetByKey(key)
if err != nil {
level.Error(e.logger).Log("msg", "getting object from store failed", "key", key)
return true
}
if !exists {
send(ctx, ch, &targetgroup.Group{Source: endpointSliceSourceFromNamespaceAndName(namespace, name)})
return true
}
esa, err := e.getEndpointSliceAdaptor(o)
if err != nil {
level.Error(e.logger).Log("msg", "converting to EndpointSlice object failed", "err", err)
return true
}
send(ctx, ch, e.buildEndpointSlice(esa))
return true
}
func (e *EndpointSlice) getEndpointSliceAdaptor(o interface{}) (endpointSliceAdaptor, error) {
switch endpointSlice := o.(type) {
case *v1.EndpointSlice:
return newEndpointSliceAdaptorFromV1(endpointSlice), nil
case *v1beta1.EndpointSlice:
return newEndpointSliceAdaptorFromV1beta1(endpointSlice), nil
default:
return nil, fmt.Errorf("received unexpected object: %v", o)
}
}
func endpointSliceSource(ep endpointSliceAdaptor) string {
return endpointSliceSourceFromNamespaceAndName(ep.namespace(), ep.name())
}
func endpointSliceSourceFromNamespaceAndName(namespace, name string) string {
return "endpointslice/" + namespace + "/" + name
}
const (
endpointSliceNameLabel = metaLabelPrefix + "endpointslice_name"
endpointSliceAddressTypeLabel = metaLabelPrefix + "endpointslice_address_type"
endpointSlicePortNameLabel = metaLabelPrefix + "endpointslice_port_name"
endpointSlicePortProtocolLabel = metaLabelPrefix + "endpointslice_port_protocol"
endpointSlicePortLabel = metaLabelPrefix + "endpointslice_port"
endpointSlicePortAppProtocol = metaLabelPrefix + "endpointslice_port_app_protocol"
endpointSliceEndpointConditionsReadyLabel = metaLabelPrefix + "endpointslice_endpoint_conditions_ready"
endpointSliceEndpointHostnameLabel = metaLabelPrefix + "endpointslice_endpoint_hostname"
endpointSliceAddressTargetKindLabel = metaLabelPrefix + "endpointslice_address_target_kind"
endpointSliceAddressTargetNameLabel = metaLabelPrefix + "endpointslice_address_target_name"
endpointSliceEndpointTopologyLabelPrefix = metaLabelPrefix + "endpointslice_endpoint_topology_"
endpointSliceEndpointTopologyLabelPresentPrefix = metaLabelPrefix + "endpointslice_endpoint_topology_present_"
)
func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgroup.Group {
tg := &targetgroup.Group{
Source: endpointSliceSource(eps),
}
tg.Labels = model.LabelSet{
namespaceLabel: lv(eps.namespace()),
endpointSliceNameLabel: lv(eps.name()),
endpointSliceAddressTypeLabel: lv(eps.addressType()),
}
e.addServiceLabels(eps, tg)
type podEntry struct {
pod *apiv1.Pod
servicePorts []endpointSlicePortAdaptor
}
seenPods := map[string]*podEntry{}
add := func(addr string, ep endpointSliceEndpointAdaptor, port endpointSlicePortAdaptor) {
a := addr
if port.port() != nil {
a = net.JoinHostPort(addr, strconv.FormatUint(uint64(*port.port()), 10))
}
target := model.LabelSet{
model.AddressLabel: lv(a),
}
if port.name() != nil {
target[endpointSlicePortNameLabel] = lv(*port.name())
}
if port.protocol() != nil {
target[endpointSlicePortProtocolLabel] = lv(string(*port.protocol()))
}
if port.port() != nil {
target[endpointSlicePortLabel] = lv(strconv.FormatUint(uint64(*port.port()), 10))
}
if port.appProtocol() != nil {
target[endpointSlicePortAppProtocol] = lv(*port.appProtocol())
}
if ep.conditions().ready() != nil {
target[endpointSliceEndpointConditionsReadyLabel] = lv(strconv.FormatBool(*ep.conditions().ready()))
}
if ep.hostname() != nil {
target[endpointSliceEndpointHostnameLabel] = lv(*ep.hostname())
}
if ep.targetRef() != nil {
target[model.LabelName(endpointSliceAddressTargetKindLabel)] = lv(ep.targetRef().Kind)
target[model.LabelName(endpointSliceAddressTargetNameLabel)] = lv(ep.targetRef().Name)
}
for k, v := range ep.topology() {
ln := strutil.SanitizeLabelName(k)
target[model.LabelName(endpointSliceEndpointTopologyLabelPrefix+ln)] = lv(v)
target[model.LabelName(endpointSliceEndpointTopologyLabelPresentPrefix+ln)] = presentValue
}
if e.withNodeMetadata {
target = addNodeLabels(target, e.nodeInf, e.logger, ep.nodename())
}
pod := e.resolvePodRef(ep.targetRef())
if pod == nil {
// This target is not a Pod, so don't continue with Pod specific logic.
tg.Targets = append(tg.Targets, target)
return
}
s := pod.Namespace + "/" + pod.Name
sp, ok := seenPods[s]
if !ok {
sp = &podEntry{pod: pod}
seenPods[s] = sp
}
// Attach standard pod labels.
target = target.Merge(podLabels(pod))
// Attach potential container port labels matching the endpoint port.
for _, c := range pod.Spec.Containers {
for _, cport := range c.Ports {
if port.port() == nil {
continue
}
if *port.port() == cport.ContainerPort {
ports := strconv.FormatUint(uint64(*port.port()), 10)
target[podContainerNameLabel] = lv(c.Name)
target[podContainerImageLabel] = lv(c.Image)
target[podContainerPortNameLabel] = lv(cport.Name)
target[podContainerPortNumberLabel] = lv(ports)
target[podContainerPortProtocolLabel] = lv(string(cport.Protocol))
break
}
}
}
// Add service port so we know that we have already generated a target
// for it.
sp.servicePorts = append(sp.servicePorts, port)
tg.Targets = append(tg.Targets, target)
}
for _, ep := range eps.endpoints() {
for _, port := range eps.ports() {
for _, addr := range ep.addresses() {
add(addr, ep, port)
}
}
}
// For all seen pods, check all container ports. If they were not covered
// by one of the service endpoints, generate targets for them.
for _, pe := range seenPods {
for _, c := range pe.pod.Spec.Containers {
for _, cport := range c.Ports {
hasSeenPort := func() bool {
for _, eport := range pe.servicePorts {
if eport.port() == nil {
continue
}
if cport.ContainerPort == *eport.port() {
return true
}
}
return false
}
if hasSeenPort() {
continue
}
a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatUint(uint64(cport.ContainerPort), 10))
ports := strconv.FormatUint(uint64(cport.ContainerPort), 10)
target := model.LabelSet{
model.AddressLabel: lv(a),
podContainerNameLabel: lv(c.Name),
podContainerImageLabel: lv(c.Image),
podContainerPortNameLabel: lv(cport.Name),
podContainerPortNumberLabel: lv(ports),
podContainerPortProtocolLabel: lv(string(cport.Protocol)),
}
tg.Targets = append(tg.Targets, target.Merge(podLabels(pe.pod)))
}
}
}
return tg
}
func (e *EndpointSlice) resolvePodRef(ref *apiv1.ObjectReference) *apiv1.Pod {
if ref == nil || ref.Kind != "Pod" {
return nil
}
p := &apiv1.Pod{}
p.Namespace = ref.Namespace
p.Name = ref.Name
obj, exists, err := e.podStore.Get(p)
if err != nil {
level.Error(e.logger).Log("msg", "resolving pod ref failed", "err", err)
return nil
}
if !exists {
return nil
}
return obj.(*apiv1.Pod)
}
func (e *EndpointSlice) addServiceLabels(esa endpointSliceAdaptor, tg *targetgroup.Group) {
var (
svc = &apiv1.Service{}
found bool
)
svc.Namespace = esa.namespace()
// Every EndpointSlice object has the Service they belong to in the
// kubernetes.io/service-name label.
svc.Name, found = esa.labels()[esa.labelServiceName()]
if !found {
return
}
obj, exists, err := e.serviceStore.Get(svc)
if err != nil {
level.Error(e.logger).Log("msg", "retrieving service failed", "err", err)
return
}
if !exists {
return
}
svc = obj.(*apiv1.Service)
tg.Labels = tg.Labels.Merge(serviceLabels(svc))
}