mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
3677d61a4b
A new API is available for AddEventHandlers, to get errors but also be able to cancel handlers. Doing the easy thing for the release, which is just to log errors. We could see how to improve this in the future to handle the errors properly and cancel the handlers. Signed-off-by: Julien Pivotto <roidelapluie@o11y.eu>
469 lines
14 KiB
Go
469 lines
14 KiB
Go
// Copyright 2020 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package kubernetes
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"strconv"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/prometheus/common/model"
|
|
apiv1 "k8s.io/api/core/v1"
|
|
v1 "k8s.io/api/discovery/v1"
|
|
"k8s.io/api/discovery/v1beta1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/util/strutil"
|
|
)
|
|
|
|
var (
|
|
epslAddCount = eventCount.WithLabelValues("endpointslice", "add")
|
|
epslUpdateCount = eventCount.WithLabelValues("endpointslice", "update")
|
|
epslDeleteCount = eventCount.WithLabelValues("endpointslice", "delete")
|
|
)
|
|
|
|
// EndpointSlice discovers new endpoint targets.
|
|
type EndpointSlice struct {
|
|
logger log.Logger
|
|
|
|
endpointSliceInf cache.SharedIndexInformer
|
|
serviceInf cache.SharedInformer
|
|
podInf cache.SharedInformer
|
|
nodeInf cache.SharedInformer
|
|
withNodeMetadata bool
|
|
|
|
podStore cache.Store
|
|
endpointSliceStore cache.Store
|
|
serviceStore cache.Store
|
|
|
|
queue *workqueue.Type
|
|
}
|
|
|
|
// NewEndpointSlice returns a new endpointslice discovery.
|
|
func NewEndpointSlice(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *EndpointSlice {
|
|
if l == nil {
|
|
l = log.NewNopLogger()
|
|
}
|
|
e := &EndpointSlice{
|
|
logger: l,
|
|
endpointSliceInf: eps,
|
|
endpointSliceStore: eps.GetStore(),
|
|
serviceInf: svc,
|
|
serviceStore: svc.GetStore(),
|
|
podInf: pod,
|
|
podStore: pod.GetStore(),
|
|
nodeInf: node,
|
|
withNodeMetadata: node != nil,
|
|
queue: workqueue.NewNamed("endpointSlice"),
|
|
}
|
|
|
|
_, err := e.endpointSliceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(o interface{}) {
|
|
epslAddCount.Inc()
|
|
e.enqueue(o)
|
|
},
|
|
UpdateFunc: func(_, o interface{}) {
|
|
epslUpdateCount.Inc()
|
|
e.enqueue(o)
|
|
},
|
|
DeleteFunc: func(o interface{}) {
|
|
epslDeleteCount.Inc()
|
|
e.enqueue(o)
|
|
},
|
|
})
|
|
if err != nil {
|
|
level.Error(l).Log("msg", "Error adding endpoint slices event handler.", "err", err)
|
|
}
|
|
|
|
serviceUpdate := func(o interface{}) {
|
|
svc, err := convertToService(o)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "converting to Service object failed", "err", err)
|
|
return
|
|
}
|
|
|
|
// TODO(brancz): use cache.Indexer to index endpoints by
|
|
// disv1beta1.LabelServiceName so this operation doesn't have to
|
|
// iterate over all endpoint objects.
|
|
for _, obj := range e.endpointSliceStore.List() {
|
|
esa, err := e.getEndpointSliceAdaptor(obj)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "converting to EndpointSlice object failed", "err", err)
|
|
continue
|
|
}
|
|
if lv, exists := esa.labels()[esa.labelServiceName()]; exists && lv == svc.Name {
|
|
e.enqueue(esa.get())
|
|
}
|
|
}
|
|
}
|
|
_, err = e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(o interface{}) {
|
|
svcAddCount.Inc()
|
|
serviceUpdate(o)
|
|
},
|
|
UpdateFunc: func(_, o interface{}) {
|
|
svcUpdateCount.Inc()
|
|
serviceUpdate(o)
|
|
},
|
|
DeleteFunc: func(o interface{}) {
|
|
svcDeleteCount.Inc()
|
|
serviceUpdate(o)
|
|
},
|
|
})
|
|
if err != nil {
|
|
level.Error(l).Log("msg", "Error adding services event handler.", "err", err)
|
|
}
|
|
|
|
if e.withNodeMetadata {
|
|
_, err = e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(o interface{}) {
|
|
node := o.(*apiv1.Node)
|
|
e.enqueueNode(node.Name)
|
|
},
|
|
UpdateFunc: func(_, o interface{}) {
|
|
node := o.(*apiv1.Node)
|
|
e.enqueueNode(node.Name)
|
|
},
|
|
DeleteFunc: func(o interface{}) {
|
|
node := o.(*apiv1.Node)
|
|
e.enqueueNode(node.Name)
|
|
},
|
|
})
|
|
if err != nil {
|
|
level.Error(l).Log("msg", "Error adding nodes event handler.", "err", err)
|
|
}
|
|
}
|
|
|
|
return e
|
|
}
|
|
|
|
func (e *EndpointSlice) enqueueNode(nodeName string) {
|
|
endpoints, err := e.endpointSliceInf.GetIndexer().ByIndex(nodeIndex, nodeName)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err)
|
|
return
|
|
}
|
|
|
|
for _, endpoint := range endpoints {
|
|
e.enqueue(endpoint)
|
|
}
|
|
}
|
|
|
|
func (e *EndpointSlice) enqueue(obj interface{}) {
|
|
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
e.queue.Add(key)
|
|
}
|
|
|
|
// Run implements the Discoverer interface.
|
|
func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
|
defer e.queue.ShutDown()
|
|
|
|
cacheSyncs := []cache.InformerSynced{e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
|
|
if e.withNodeMetadata {
|
|
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
|
|
}
|
|
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
|
|
if ctx.Err() != context.Canceled {
|
|
level.Error(e.logger).Log("msg", "endpointslice informer unable to sync cache")
|
|
}
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
for e.process(ctx, ch) {
|
|
}
|
|
}()
|
|
|
|
// Block until the target provider is explicitly canceled.
|
|
<-ctx.Done()
|
|
}
|
|
|
|
func (e *EndpointSlice) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
|
|
keyObj, quit := e.queue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
defer e.queue.Done(keyObj)
|
|
key := keyObj.(string)
|
|
|
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "splitting key failed", "key", key)
|
|
return true
|
|
}
|
|
|
|
o, exists, err := e.endpointSliceStore.GetByKey(key)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "getting object from store failed", "key", key)
|
|
return true
|
|
}
|
|
if !exists {
|
|
send(ctx, ch, &targetgroup.Group{Source: endpointSliceSourceFromNamespaceAndName(namespace, name)})
|
|
return true
|
|
}
|
|
|
|
esa, err := e.getEndpointSliceAdaptor(o)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "converting to EndpointSlice object failed", "err", err)
|
|
return true
|
|
}
|
|
|
|
send(ctx, ch, e.buildEndpointSlice(esa))
|
|
return true
|
|
}
|
|
|
|
func (e *EndpointSlice) getEndpointSliceAdaptor(o interface{}) (endpointSliceAdaptor, error) {
|
|
switch endpointSlice := o.(type) {
|
|
case *v1.EndpointSlice:
|
|
return newEndpointSliceAdaptorFromV1(endpointSlice), nil
|
|
case *v1beta1.EndpointSlice:
|
|
return newEndpointSliceAdaptorFromV1beta1(endpointSlice), nil
|
|
default:
|
|
return nil, fmt.Errorf("received unexpected object: %v", o)
|
|
}
|
|
}
|
|
|
|
func endpointSliceSource(ep endpointSliceAdaptor) string {
|
|
return endpointSliceSourceFromNamespaceAndName(ep.namespace(), ep.name())
|
|
}
|
|
|
|
func endpointSliceSourceFromNamespaceAndName(namespace, name string) string {
|
|
return "endpointslice/" + namespace + "/" + name
|
|
}
|
|
|
|
const (
|
|
endpointSliceNameLabel = metaLabelPrefix + "endpointslice_name"
|
|
endpointSliceAddressTypeLabel = metaLabelPrefix + "endpointslice_address_type"
|
|
endpointSlicePortNameLabel = metaLabelPrefix + "endpointslice_port_name"
|
|
endpointSlicePortProtocolLabel = metaLabelPrefix + "endpointslice_port_protocol"
|
|
endpointSlicePortLabel = metaLabelPrefix + "endpointslice_port"
|
|
endpointSlicePortAppProtocol = metaLabelPrefix + "endpointslice_port_app_protocol"
|
|
endpointSliceEndpointConditionsReadyLabel = metaLabelPrefix + "endpointslice_endpoint_conditions_ready"
|
|
endpointSliceEndpointHostnameLabel = metaLabelPrefix + "endpointslice_endpoint_hostname"
|
|
endpointSliceAddressTargetKindLabel = metaLabelPrefix + "endpointslice_address_target_kind"
|
|
endpointSliceAddressTargetNameLabel = metaLabelPrefix + "endpointslice_address_target_name"
|
|
endpointSliceEndpointTopologyLabelPrefix = metaLabelPrefix + "endpointslice_endpoint_topology_"
|
|
endpointSliceEndpointTopologyLabelPresentPrefix = metaLabelPrefix + "endpointslice_endpoint_topology_present_"
|
|
)
|
|
|
|
func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgroup.Group {
|
|
tg := &targetgroup.Group{
|
|
Source: endpointSliceSource(eps),
|
|
}
|
|
tg.Labels = model.LabelSet{
|
|
namespaceLabel: lv(eps.namespace()),
|
|
endpointSliceNameLabel: lv(eps.name()),
|
|
endpointSliceAddressTypeLabel: lv(eps.addressType()),
|
|
}
|
|
e.addServiceLabels(eps, tg)
|
|
|
|
type podEntry struct {
|
|
pod *apiv1.Pod
|
|
servicePorts []endpointSlicePortAdaptor
|
|
}
|
|
seenPods := map[string]*podEntry{}
|
|
|
|
add := func(addr string, ep endpointSliceEndpointAdaptor, port endpointSlicePortAdaptor) {
|
|
a := addr
|
|
if port.port() != nil {
|
|
a = net.JoinHostPort(addr, strconv.FormatUint(uint64(*port.port()), 10))
|
|
}
|
|
|
|
target := model.LabelSet{
|
|
model.AddressLabel: lv(a),
|
|
}
|
|
|
|
if port.name() != nil {
|
|
target[endpointSlicePortNameLabel] = lv(*port.name())
|
|
}
|
|
|
|
if port.protocol() != nil {
|
|
target[endpointSlicePortProtocolLabel] = lv(string(*port.protocol()))
|
|
}
|
|
|
|
if port.port() != nil {
|
|
target[endpointSlicePortLabel] = lv(strconv.FormatUint(uint64(*port.port()), 10))
|
|
}
|
|
|
|
if port.appProtocol() != nil {
|
|
target[endpointSlicePortAppProtocol] = lv(*port.appProtocol())
|
|
}
|
|
|
|
if ep.conditions().ready() != nil {
|
|
target[endpointSliceEndpointConditionsReadyLabel] = lv(strconv.FormatBool(*ep.conditions().ready()))
|
|
}
|
|
|
|
if ep.hostname() != nil {
|
|
target[endpointSliceEndpointHostnameLabel] = lv(*ep.hostname())
|
|
}
|
|
|
|
if ep.targetRef() != nil {
|
|
target[model.LabelName(endpointSliceAddressTargetKindLabel)] = lv(ep.targetRef().Kind)
|
|
target[model.LabelName(endpointSliceAddressTargetNameLabel)] = lv(ep.targetRef().Name)
|
|
}
|
|
|
|
for k, v := range ep.topology() {
|
|
ln := strutil.SanitizeLabelName(k)
|
|
target[model.LabelName(endpointSliceEndpointTopologyLabelPrefix+ln)] = lv(v)
|
|
target[model.LabelName(endpointSliceEndpointTopologyLabelPresentPrefix+ln)] = presentValue
|
|
}
|
|
|
|
if e.withNodeMetadata {
|
|
target = addNodeLabels(target, e.nodeInf, e.logger, ep.nodename())
|
|
}
|
|
|
|
pod := e.resolvePodRef(ep.targetRef())
|
|
if pod == nil {
|
|
// This target is not a Pod, so don't continue with Pod specific logic.
|
|
tg.Targets = append(tg.Targets, target)
|
|
return
|
|
}
|
|
s := pod.Namespace + "/" + pod.Name
|
|
|
|
sp, ok := seenPods[s]
|
|
if !ok {
|
|
sp = &podEntry{pod: pod}
|
|
seenPods[s] = sp
|
|
}
|
|
|
|
// Attach standard pod labels.
|
|
target = target.Merge(podLabels(pod))
|
|
|
|
// Attach potential container port labels matching the endpoint port.
|
|
for _, c := range pod.Spec.Containers {
|
|
for _, cport := range c.Ports {
|
|
if port.port() == nil {
|
|
continue
|
|
}
|
|
if *port.port() == cport.ContainerPort {
|
|
ports := strconv.FormatUint(uint64(*port.port()), 10)
|
|
|
|
target[podContainerNameLabel] = lv(c.Name)
|
|
target[podContainerImageLabel] = lv(c.Image)
|
|
target[podContainerPortNameLabel] = lv(cport.Name)
|
|
target[podContainerPortNumberLabel] = lv(ports)
|
|
target[podContainerPortProtocolLabel] = lv(string(cport.Protocol))
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add service port so we know that we have already generated a target
|
|
// for it.
|
|
sp.servicePorts = append(sp.servicePorts, port)
|
|
tg.Targets = append(tg.Targets, target)
|
|
}
|
|
|
|
for _, ep := range eps.endpoints() {
|
|
for _, port := range eps.ports() {
|
|
for _, addr := range ep.addresses() {
|
|
add(addr, ep, port)
|
|
}
|
|
}
|
|
}
|
|
|
|
// For all seen pods, check all container ports. If they were not covered
|
|
// by one of the service endpoints, generate targets for them.
|
|
for _, pe := range seenPods {
|
|
for _, c := range pe.pod.Spec.Containers {
|
|
for _, cport := range c.Ports {
|
|
hasSeenPort := func() bool {
|
|
for _, eport := range pe.servicePorts {
|
|
if eport.port() == nil {
|
|
continue
|
|
}
|
|
if cport.ContainerPort == *eport.port() {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
if hasSeenPort() {
|
|
continue
|
|
}
|
|
|
|
a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatUint(uint64(cport.ContainerPort), 10))
|
|
ports := strconv.FormatUint(uint64(cport.ContainerPort), 10)
|
|
|
|
target := model.LabelSet{
|
|
model.AddressLabel: lv(a),
|
|
podContainerNameLabel: lv(c.Name),
|
|
podContainerImageLabel: lv(c.Image),
|
|
podContainerPortNameLabel: lv(cport.Name),
|
|
podContainerPortNumberLabel: lv(ports),
|
|
podContainerPortProtocolLabel: lv(string(cport.Protocol)),
|
|
}
|
|
tg.Targets = append(tg.Targets, target.Merge(podLabels(pe.pod)))
|
|
}
|
|
}
|
|
}
|
|
|
|
return tg
|
|
}
|
|
|
|
func (e *EndpointSlice) resolvePodRef(ref *apiv1.ObjectReference) *apiv1.Pod {
|
|
if ref == nil || ref.Kind != "Pod" {
|
|
return nil
|
|
}
|
|
p := &apiv1.Pod{}
|
|
p.Namespace = ref.Namespace
|
|
p.Name = ref.Name
|
|
|
|
obj, exists, err := e.podStore.Get(p)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "resolving pod ref failed", "err", err)
|
|
return nil
|
|
}
|
|
if !exists {
|
|
return nil
|
|
}
|
|
return obj.(*apiv1.Pod)
|
|
}
|
|
|
|
func (e *EndpointSlice) addServiceLabels(esa endpointSliceAdaptor, tg *targetgroup.Group) {
|
|
var (
|
|
svc = &apiv1.Service{}
|
|
found bool
|
|
)
|
|
svc.Namespace = esa.namespace()
|
|
|
|
// Every EndpointSlice object has the Service they belong to in the
|
|
// kubernetes.io/service-name label.
|
|
svc.Name, found = esa.labels()[esa.labelServiceName()]
|
|
if !found {
|
|
return
|
|
}
|
|
|
|
obj, exists, err := e.serviceStore.Get(svc)
|
|
if err != nil {
|
|
level.Error(e.logger).Log("msg", "retrieving service failed", "err", err)
|
|
return
|
|
}
|
|
if !exists {
|
|
return
|
|
}
|
|
svc = obj.(*apiv1.Service)
|
|
|
|
tg.Labels = tg.Labels.Merge(serviceLabels(svc))
|
|
}
|