Merge pull request #10759 from fpetkovski/endpoints-node-meta

kubernetes_sd: Allow attaching node labels for endpoint role
This commit is contained in:
Frederic Branczyk 2022-06-16 17:09:32 +02:00 committed by GitHub
commit 0d0e44d7e6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 478 additions and 45 deletions

View file

@ -41,9 +41,11 @@ var (
type Endpoints struct {
logger log.Logger
endpointsInf cache.SharedInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
endpointsInf cache.SharedIndexInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
podStore cache.Store
endpointsStore cache.Store
@ -53,19 +55,21 @@ type Endpoints struct {
}
// NewEndpoints returns a new endpoints discovery.
func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints {
func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *Endpoints {
if l == nil {
l = log.NewNopLogger()
}
e := &Endpoints{
logger: l,
endpointsInf: eps,
endpointsStore: eps.GetStore(),
serviceInf: svc,
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
queue: workqueue.NewNamed("endpoints"),
logger: l,
endpointsInf: eps,
endpointsStore: eps.GetStore(),
serviceInf: svc,
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
queue: workqueue.NewNamed("endpoints"),
}
e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -118,10 +122,38 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints {
serviceUpdate(o)
},
})
if e.withNodeMetadata {
e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
UpdateFunc: func(_, o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
})
}
return e
}
func (e *Endpoints) enqueueNode(nodeName string) {
endpoints, err := e.endpointsInf.GetIndexer().ByIndex(nodeIndex, nodeName)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err)
return
}
for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}
func (e *Endpoints) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
@ -135,7 +167,12 @@ func (e *Endpoints) enqueue(obj interface{}) {
func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer e.queue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) {
cacheSyncs := []cache.InformerSynced{e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache")
}
@ -257,6 +294,10 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *targetgroup.Group {
target[model.LabelName(endpointHostname)] = lv(addr.Hostname)
}
if e.withNodeMetadata {
target = addNodeLabels(target, e.nodeInf, e.logger, addr.NodeName)
}
pod := e.resolvePodRef(addr.TargetRef)
if pod == nil {
// This target is not a Pod, so don't continue with Pod specific logic.
@ -387,3 +428,31 @@ func (e *Endpoints) addServiceLabels(ns, name string, tg *targetgroup.Group) {
tg.Labels = tg.Labels.Merge(serviceLabels(svc))
}
func addNodeLabels(tg model.LabelSet, nodeInf cache.SharedInformer, logger log.Logger, nodeName *string) model.LabelSet {
if nodeName == nil {
return tg
}
obj, exists, err := nodeInf.GetStore().GetByKey(*nodeName)
if err != nil {
level.Error(logger).Log("msg", "Error getting node", "node", *nodeName, "err", err)
return tg
}
if !exists {
return tg
}
node := obj.(*apiv1.Node)
// Allocate one target label for the node name,
// and two target labels for each node label.
nodeLabelset := make(model.LabelSet, 1+2*len(node.GetLabels()))
nodeLabelset[nodeNameLabel] = lv(*nodeName)
for k, v := range node.GetLabels() {
ln := strutil.SanitizeLabelName(k)
nodeLabelset[model.LabelName(nodeLabelPrefix+ln)] = lv(v)
nodeLabelset[model.LabelName(nodeLabelPresentPrefix+ln)] = presentValue
}
return tg.Merge(nodeLabelset)
}

View file

@ -478,6 +478,126 @@ func TestEndpointsDiscoveryWithServiceUpdate(t *testing.T) {
}.Run(t)
}
func TestEndpointsDiscoveryWithNodeMetadata(t *testing.T) {
metadataConfig := AttachMetadataConfig{Node: true}
nodeLabels := map[string]string{"az": "us-east1"}
node := makeNode("foobar", "", "", nodeLabels, nil)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app/name": "test",
},
},
}
n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), svc, node)
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
"__meta_kubernetes_node_label_az": "us-east1",
"__meta_kubernetes_node_labelpresent_az": "true",
"__meta_kubernetes_node_name": "foobar",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpoints/default/testendpoints",
},
},
}.Run(t)
}
func TestEndpointsDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
nodeLabels := map[string]string{"az": "us-east1"}
nodes := makeNode("foobar", "", "", nodeLabels, nil)
metadataConfig := AttachMetadataConfig{Node: true}
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app/name": "test",
},
},
}
n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), nodes, svc)
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
nodes.Labels["az"] = "eu-central1"
c.CoreV1().Nodes().Update(context.Background(), nodes, metav1.UpdateOptions{})
},
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
"__meta_kubernetes_node_label_az": "eu-central1",
"__meta_kubernetes_node_labelpresent_az": "true",
"__meta_kubernetes_node_name": "foobar",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpoints/default/testendpoints",
},
},
}.Run(t)
}
func TestEndpointsDiscoveryNamespaces(t *testing.T) {
epOne := makeEndpoints()
epOne.Namespace = "ns1"

View file

@ -42,9 +42,11 @@ var (
type EndpointSlice struct {
logger log.Logger
endpointSliceInf cache.SharedInformer
endpointSliceInf cache.SharedIndexInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
podStore cache.Store
endpointSliceStore cache.Store
@ -54,7 +56,7 @@ type EndpointSlice struct {
}
// NewEndpointSlice returns a new endpointslice discovery.
func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *EndpointSlice {
func NewEndpointSlice(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *EndpointSlice {
if l == nil {
l = log.NewNopLogger()
}
@ -66,6 +68,8 @@ func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoin
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
queue: workqueue.NewNamed("endpointSlice"),
}
@ -120,9 +124,38 @@ func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoin
},
})
if e.withNodeMetadata {
e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
UpdateFunc: func(_, o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
})
}
return e
}
func (e *EndpointSlice) enqueueNode(nodeName string) {
endpoints, err := e.endpointSliceInf.GetIndexer().ByIndex(nodeIndex, nodeName)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err)
return
}
for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}
func (e *EndpointSlice) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
@ -136,7 +169,11 @@ func (e *EndpointSlice) enqueue(obj interface{}) {
func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer e.queue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) {
cacheSyncs := []cache.InformerSynced{e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if ctx.Err() != context.Canceled {
level.Error(e.logger).Log("msg", "endpointslice informer unable to sync cache")
}
@ -282,6 +319,10 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou
target[model.LabelName(endpointSliceEndpointTopologyLabelPresentPrefix+ln)] = presentValue
}
if e.withNodeMetadata {
target = addNodeLabels(target, e.nodeInf, e.logger, ep.nodename())
}
pod := e.resolvePodRef(ep.targetRef())
if pod == nil {
// This target is not a Pod, so don't continue with Pod specific logic.

View file

@ -41,6 +41,7 @@ type endpointSlicePortAdaptor interface {
type endpointSliceEndpointAdaptor interface {
addresses() []string
hostname() *string
nodename() *string
conditions() endpointSliceEndpointConditionsAdaptor
targetRef() *corev1.ObjectReference
topology() map[string]string
@ -164,6 +165,10 @@ func (e *endpointSliceEndpointAdaptorV1) hostname() *string {
return e.endpoint.Hostname
}
func (e *endpointSliceEndpointAdaptorV1) nodename() *string {
return e.endpoint.NodeName
}
func (e *endpointSliceEndpointAdaptorV1) conditions() endpointSliceEndpointConditionsAdaptor {
return newEndpointSliceEndpointConditionsAdaptorFromV1(e.endpoint.Conditions)
}
@ -204,6 +209,10 @@ func (e *endpointSliceEndpointAdaptorV1beta1) hostname() *string {
return e.endpoint.Hostname
}
func (e *endpointSliceEndpointAdaptorV1beta1) nodename() *string {
return e.endpoint.NodeName
}
func (e *endpointSliceEndpointAdaptorV1beta1) conditions() endpointSliceEndpointConditionsAdaptor {
return newEndpointSliceEndpointConditionsAdaptorFromV1beta1(e.endpoint.Conditions)
}

View file

@ -68,6 +68,7 @@ func makeEndpointSliceV1() *v1.EndpointSlice {
Conditions: v1.EndpointConditions{Ready: boolptr(true)},
Hostname: strptr("testendpoint1"),
TargetRef: &corev1.ObjectReference{},
NodeName: strptr("foobar"),
DeprecatedTopology: map[string]string{
"topology": "value",
},
@ -688,6 +689,147 @@ func TestEndpointSliceDiscoveryWithServiceUpdate(t *testing.T) {
}.Run(t)
}
func TestEndpointsSlicesDiscoveryWithNodeMetadata(t *testing.T) {
metadataConfig := AttachMetadataConfig{Node: true}
nodeLabels := map[string]string{"az": "us-east1"}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app/name": "test",
},
},
}
objs := []runtime.Object{makeEndpointSliceV1(), makeNode("foobar", "", "", nodeLabels, nil), svc}
n, _ := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...)
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"endpointslice/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "",
"__meta_kubernetes_endpointslice_address_target_name": "",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_topology": "value",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
"__meta_kubernetes_node_label_az": "us-east1",
"__meta_kubernetes_node_labelpresent_az": "true",
"__meta_kubernetes_node_name": "foobar",
},
{
"__address__": "2.3.4.5:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "3.4.5.6:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_name": "testendpoints",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpointslice/default/testendpoints",
},
},
}.Run(t)
}
func TestEndpointsSlicesDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
metadataConfig := AttachMetadataConfig{Node: true}
nodeLabels := map[string]string{"az": "us-east1"}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app/name": "test",
},
},
}
node := makeNode("foobar", "", "", nodeLabels, nil)
objs := []runtime.Object{makeEndpointSliceV1(), node, svc}
n, c := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...)
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 2,
afterStart: func() {
node.Labels["az"] = "us-central1"
c.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{})
},
expectedRes: map[string]*targetgroup.Group{
"endpointslice/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "",
"__meta_kubernetes_endpointslice_address_target_name": "",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_topology": "value",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
"__meta_kubernetes_node_label_az": "us-central1",
"__meta_kubernetes_node_labelpresent_az": "true",
"__meta_kubernetes_node_name": "foobar",
},
{
"__address__": "2.3.4.5:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "3.4.5.6:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_name": "testendpoints",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpointslice/default/testendpoints",
},
},
}.Run(t)
}
func TestEndpointSliceDiscoveryNamespaces(t *testing.T) {
epOne := makeEndpointSliceV1()
epOne.Namespace = "ns1"

View file

@ -23,6 +23,8 @@ import (
"sync"
"time"
disv1beta1 "k8s.io/api/discovery/v1beta1"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
@ -31,7 +33,6 @@ import (
"github.com/prometheus/common/version"
apiv1 "k8s.io/api/core/v1"
disv1 "k8s.io/api/discovery/v1"
disv1beta1 "k8s.io/api/discovery/v1beta1"
networkv1 "k8s.io/api/networking/v1"
"k8s.io/api/networking/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -406,7 +407,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
for _, namespace := range namespaces {
var informer cache.SharedInformer
var informer cache.SharedIndexInformer
if v1Supported {
e := d.client.DiscoveryV1().EndpointSlices(namespace)
elw := &cache.ListWatch{
@ -421,7 +422,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return e.Watch(ctx, options)
},
}
informer = cache.NewSharedInformer(elw, &disv1.EndpointSlice{}, resyncPeriod)
informer = d.newEndpointSlicesByNodeInformer(elw, &disv1.EndpointSlice{})
} else {
e := d.client.DiscoveryV1beta1().EndpointSlices(namespace)
elw := &cache.ListWatch{
@ -436,7 +437,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return e.Watch(ctx, options)
},
}
informer = cache.NewSharedInformer(elw, &disv1beta1.EndpointSlice{}, resyncPeriod)
informer = d.newEndpointSlicesByNodeInformer(elw, &disv1beta1.EndpointSlice{})
}
s := d.client.CoreV1().Services(namespace)
@ -465,11 +466,17 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return p.Watch(ctx, options)
},
}
var nodeInf cache.SharedInformer
if d.attachMetadata.Node {
nodeInf = d.newNodeInformer(context.Background())
go nodeInf.Run(ctx.Done())
}
eps := NewEndpointSlice(
log.With(d.logger, "role", "endpointslice"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
informer,
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
nodeInf,
)
d.discoverers = append(d.discoverers, eps)
go eps.endpointSliceInf.Run(ctx.Done())
@ -517,11 +524,18 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return p.Watch(ctx, options)
},
}
var nodeInf cache.SharedInformer
if d.attachMetadata.Node {
nodeInf = d.newNodeInformer(ctx)
go nodeInf.Run(ctx.Done())
}
eps := NewEndpoints(
log.With(d.logger, "role", "endpoint"),
d.newEndpointsByNodeInformer(elw),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
nodeInf,
)
d.discoverers = append(d.discoverers, eps)
go eps.endpointsInf.Run(ctx.Done())
@ -735,6 +749,65 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde
return cache.NewSharedIndexInformer(plw, &apiv1.Pod{}, resyncPeriod, indexers)
}
func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
if !d.attachMetadata.Node {
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncPeriod, indexers)
}
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*apiv1.Endpoints)
if !ok {
return nil, fmt.Errorf("object is not a pod")
}
var nodes []string
for _, target := range e.Subsets {
for _, addr := range target.Addresses {
if addr.NodeName == nil {
continue
}
nodes = append(nodes, *addr.NodeName)
}
}
return nodes, nil
}
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncPeriod, indexers)
}
func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
if !d.attachMetadata.Node {
cache.NewSharedIndexInformer(plw, &disv1.EndpointSlice{}, resyncPeriod, indexers)
}
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
var nodes []string
switch e := obj.(type) {
case *disv1.EndpointSlice:
for _, target := range e.Endpoints {
if target.NodeName == nil {
continue
}
nodes = append(nodes, *target.NodeName)
}
case *disv1beta1.EndpointSlice:
for _, target := range e.Endpoints {
if target.NodeName == nil {
continue
}
nodes = append(nodes, *target.NodeName)
}
default:
return nil, fmt.Errorf("object is not an endpointslice")
}
return nodes, nil
}
return cache.NewSharedIndexInformer(plw, object, resyncPeriod, indexers)
}
func checkDiscoveryV1Supported(client kubernetes.Interface) (bool, error) {
k8sVer, err := client.Discovery().ServerVersion()
if err != nil {

View file

@ -253,7 +253,7 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group {
tg.Labels = podLabels(pod)
tg.Labels[namespaceLabel] = lv(pod.Namespace)
if p.withNodeMetadata {
p.attachNodeMetadata(tg, pod)
tg.Labels = addNodeLabels(tg.Labels, p.nodeInf, p.logger, &pod.Spec.NodeName)
}
containers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
@ -291,27 +291,6 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group {
return tg
}
func (p *Pod) attachNodeMetadata(tg *targetgroup.Group, pod *apiv1.Pod) {
tg.Labels[nodeNameLabel] = lv(pod.Spec.NodeName)
obj, exists, err := p.nodeInf.GetStore().GetByKey(pod.Spec.NodeName)
if err != nil {
level.Error(p.logger).Log("msg", "Error getting node", "node", pod.Spec.NodeName, "err", err)
return
}
if !exists {
return
}
node := obj.(*apiv1.Node)
for k, v := range node.GetLabels() {
ln := strutil.SanitizeLabelName(k)
tg.Labels[model.LabelName(nodeLabelPrefix+ln)] = lv(v)
tg.Labels[model.LabelName(nodeLabelPresentPrefix+ln)] = presentValue
}
}
func (p *Pod) enqueuePodsForNode(nodeName string) {
pods, err := p.podInf.GetIndexer().ByIndex(nodeIndex, nodeName)
if err != nil {

View file

@ -1851,7 +1851,7 @@ namespaces:
# Optional metadata to attach to discovered targets. If omitted, no additional metadata is attached.
attach_metadata:
# Attaches node metadata to discovered targets. Only valid for role: pod.
# Attaches node metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice.
# When set to true, Prometheus must have permissions to get Nodes.
[ node: <boolean> | default = false ]
```