kubernetes_sd: Allow attaching node labels for endpoint role

The Kubernetes service discovery can only add node labels to
targets from the pod role.

This commit extends this functionality to the endpoints and
endpointslices roles.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
This commit is contained in:
Filip Petkovski 2022-05-27 11:45:09 +02:00
parent 80fbe1de96
commit 05da373dcb
No known key found for this signature in database
GPG key ID: 431B0F2E85E42402
8 changed files with 478 additions and 45 deletions

View file

@ -41,9 +41,11 @@ var (
type Endpoints struct { type Endpoints struct {
logger log.Logger logger log.Logger
endpointsInf cache.SharedInformer endpointsInf cache.SharedIndexInformer
serviceInf cache.SharedInformer serviceInf cache.SharedInformer
podInf cache.SharedInformer podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
podStore cache.Store podStore cache.Store
endpointsStore cache.Store endpointsStore cache.Store
@ -53,7 +55,7 @@ type Endpoints struct {
} }
// NewEndpoints returns a new endpoints discovery. // NewEndpoints returns a new endpoints discovery.
func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints { func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *Endpoints {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
@ -65,6 +67,8 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints {
serviceStore: svc.GetStore(), serviceStore: svc.GetStore(),
podInf: pod, podInf: pod,
podStore: pod.GetStore(), podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
queue: workqueue.NewNamed("endpoints"), queue: workqueue.NewNamed("endpoints"),
} }
@ -118,10 +122,38 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints {
serviceUpdate(o) serviceUpdate(o)
}, },
}) })
if e.withNodeMetadata {
e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
UpdateFunc: func(_, o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
})
}
return e return e
} }
func (e *Endpoints) enqueueNode(nodeName string) {
endpoints, err := e.endpointsInf.GetIndexer().ByIndex(nodeIndex, nodeName)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err)
return
}
for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}
func (e *Endpoints) enqueue(obj interface{}) { func (e *Endpoints) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil { if err != nil {
@ -135,7 +167,12 @@ func (e *Endpoints) enqueue(obj interface{}) {
func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer e.queue.ShutDown() defer e.queue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) { cacheSyncs := []cache.InformerSynced{e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) { if !errors.Is(ctx.Err(), context.Canceled) {
level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache") level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache")
} }
@ -257,6 +294,10 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *targetgroup.Group {
target[model.LabelName(endpointHostname)] = lv(addr.Hostname) target[model.LabelName(endpointHostname)] = lv(addr.Hostname)
} }
if e.withNodeMetadata {
target = addNodeLabels(target, e.nodeInf, e.logger, addr.NodeName)
}
pod := e.resolvePodRef(addr.TargetRef) pod := e.resolvePodRef(addr.TargetRef)
if pod == nil { if pod == nil {
// This target is not a Pod, so don't continue with Pod specific logic. // This target is not a Pod, so don't continue with Pod specific logic.
@ -387,3 +428,31 @@ func (e *Endpoints) addServiceLabels(ns, name string, tg *targetgroup.Group) {
tg.Labels = tg.Labels.Merge(serviceLabels(svc)) tg.Labels = tg.Labels.Merge(serviceLabels(svc))
} }
func addNodeLabels(tg model.LabelSet, nodeInf cache.SharedInformer, logger log.Logger, nodeName *string) model.LabelSet {
if nodeName == nil {
return tg
}
obj, exists, err := nodeInf.GetStore().GetByKey(*nodeName)
if err != nil {
level.Error(logger).Log("msg", "Error getting node", "node", *nodeName, "err", err)
return tg
}
if !exists {
return tg
}
node := obj.(*apiv1.Node)
// Allocate one target label for the node name,
// and two target labels for each node label.
nodeLabelset := make(model.LabelSet, 1+2*len(node.GetLabels()))
nodeLabelset[nodeNameLabel] = lv(*nodeName)
for k, v := range node.GetLabels() {
ln := strutil.SanitizeLabelName(k)
nodeLabelset[model.LabelName(nodeLabelPrefix+ln)] = lv(v)
nodeLabelset[model.LabelName(nodeLabelPresentPrefix+ln)] = presentValue
}
return tg.Merge(nodeLabelset)
}

View file

@ -478,6 +478,126 @@ func TestEndpointsDiscoveryWithServiceUpdate(t *testing.T) {
}.Run(t) }.Run(t)
} }
func TestEndpointsDiscoveryWithNodeMetadata(t *testing.T) {
metadataConfig := AttachMetadataConfig{Node: true}
nodeLabels := map[string]string{"az": "us-east1"}
node := makeNode("foobar", "", "", nodeLabels, nil)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app/name": "test",
},
},
}
n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), svc, node)
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
"__meta_kubernetes_node_label_az": "us-east1",
"__meta_kubernetes_node_labelpresent_az": "true",
"__meta_kubernetes_node_name": "foobar",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpoints/default/testendpoints",
},
},
}.Run(t)
}
func TestEndpointsDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
nodeLabels := map[string]string{"az": "us-east1"}
nodes := makeNode("foobar", "", "", nodeLabels, nil)
metadataConfig := AttachMetadataConfig{Node: true}
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app/name": "test",
},
},
}
n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), nodes, svc)
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
nodes.Labels["az"] = "eu-central1"
c.CoreV1().Nodes().Update(context.Background(), nodes, metav1.UpdateOptions{})
},
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
"__meta_kubernetes_node_label_az": "eu-central1",
"__meta_kubernetes_node_labelpresent_az": "true",
"__meta_kubernetes_node_name": "foobar",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpoints/default/testendpoints",
},
},
}.Run(t)
}
func TestEndpointsDiscoveryNamespaces(t *testing.T) { func TestEndpointsDiscoveryNamespaces(t *testing.T) {
epOne := makeEndpoints() epOne := makeEndpoints()
epOne.Namespace = "ns1" epOne.Namespace = "ns1"

View file

@ -42,9 +42,11 @@ var (
type EndpointSlice struct { type EndpointSlice struct {
logger log.Logger logger log.Logger
endpointSliceInf cache.SharedInformer endpointSliceInf cache.SharedIndexInformer
serviceInf cache.SharedInformer serviceInf cache.SharedInformer
podInf cache.SharedInformer podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
podStore cache.Store podStore cache.Store
endpointSliceStore cache.Store endpointSliceStore cache.Store
@ -54,7 +56,7 @@ type EndpointSlice struct {
} }
// NewEndpointSlice returns a new endpointslice discovery. // NewEndpointSlice returns a new endpointslice discovery.
func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *EndpointSlice { func NewEndpointSlice(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *EndpointSlice {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
@ -66,6 +68,8 @@ func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoin
serviceStore: svc.GetStore(), serviceStore: svc.GetStore(),
podInf: pod, podInf: pod,
podStore: pod.GetStore(), podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
queue: workqueue.NewNamed("endpointSlice"), queue: workqueue.NewNamed("endpointSlice"),
} }
@ -120,9 +124,38 @@ func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoin
}, },
}) })
if e.withNodeMetadata {
e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
UpdateFunc: func(_, o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
})
}
return e return e
} }
func (e *EndpointSlice) enqueueNode(nodeName string) {
endpoints, err := e.endpointSliceInf.GetIndexer().ByIndex(nodeIndex, nodeName)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err)
return
}
for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}
func (e *EndpointSlice) enqueue(obj interface{}) { func (e *EndpointSlice) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil { if err != nil {
@ -136,7 +169,11 @@ func (e *EndpointSlice) enqueue(obj interface{}) {
func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer e.queue.ShutDown() defer e.queue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) { cacheSyncs := []cache.InformerSynced{e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if ctx.Err() != context.Canceled { if ctx.Err() != context.Canceled {
level.Error(e.logger).Log("msg", "endpointslice informer unable to sync cache") level.Error(e.logger).Log("msg", "endpointslice informer unable to sync cache")
} }
@ -282,6 +319,10 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou
target[model.LabelName(endpointSliceEndpointTopologyLabelPresentPrefix+ln)] = presentValue target[model.LabelName(endpointSliceEndpointTopologyLabelPresentPrefix+ln)] = presentValue
} }
if e.withNodeMetadata {
target = addNodeLabels(target, e.nodeInf, e.logger, ep.nodename())
}
pod := e.resolvePodRef(ep.targetRef()) pod := e.resolvePodRef(ep.targetRef())
if pod == nil { if pod == nil {
// This target is not a Pod, so don't continue with Pod specific logic. // This target is not a Pod, so don't continue with Pod specific logic.

View file

@ -41,6 +41,7 @@ type endpointSlicePortAdaptor interface {
type endpointSliceEndpointAdaptor interface { type endpointSliceEndpointAdaptor interface {
addresses() []string addresses() []string
hostname() *string hostname() *string
nodename() *string
conditions() endpointSliceEndpointConditionsAdaptor conditions() endpointSliceEndpointConditionsAdaptor
targetRef() *corev1.ObjectReference targetRef() *corev1.ObjectReference
topology() map[string]string topology() map[string]string
@ -164,6 +165,10 @@ func (e *endpointSliceEndpointAdaptorV1) hostname() *string {
return e.endpoint.Hostname return e.endpoint.Hostname
} }
func (e *endpointSliceEndpointAdaptorV1) nodename() *string {
return e.endpoint.NodeName
}
func (e *endpointSliceEndpointAdaptorV1) conditions() endpointSliceEndpointConditionsAdaptor { func (e *endpointSliceEndpointAdaptorV1) conditions() endpointSliceEndpointConditionsAdaptor {
return newEndpointSliceEndpointConditionsAdaptorFromV1(e.endpoint.Conditions) return newEndpointSliceEndpointConditionsAdaptorFromV1(e.endpoint.Conditions)
} }
@ -204,6 +209,10 @@ func (e *endpointSliceEndpointAdaptorV1beta1) hostname() *string {
return e.endpoint.Hostname return e.endpoint.Hostname
} }
func (e *endpointSliceEndpointAdaptorV1beta1) nodename() *string {
return e.endpoint.NodeName
}
func (e *endpointSliceEndpointAdaptorV1beta1) conditions() endpointSliceEndpointConditionsAdaptor { func (e *endpointSliceEndpointAdaptorV1beta1) conditions() endpointSliceEndpointConditionsAdaptor {
return newEndpointSliceEndpointConditionsAdaptorFromV1beta1(e.endpoint.Conditions) return newEndpointSliceEndpointConditionsAdaptorFromV1beta1(e.endpoint.Conditions)
} }

View file

@ -68,6 +68,7 @@ func makeEndpointSliceV1() *v1.EndpointSlice {
Conditions: v1.EndpointConditions{Ready: boolptr(true)}, Conditions: v1.EndpointConditions{Ready: boolptr(true)},
Hostname: strptr("testendpoint1"), Hostname: strptr("testendpoint1"),
TargetRef: &corev1.ObjectReference{}, TargetRef: &corev1.ObjectReference{},
NodeName: strptr("foobar"),
DeprecatedTopology: map[string]string{ DeprecatedTopology: map[string]string{
"topology": "value", "topology": "value",
}, },
@ -688,6 +689,147 @@ func TestEndpointSliceDiscoveryWithServiceUpdate(t *testing.T) {
}.Run(t) }.Run(t)
} }
func TestEndpointsSlicesDiscoveryWithNodeMetadata(t *testing.T) {
metadataConfig := AttachMetadataConfig{Node: true}
nodeLabels := map[string]string{"az": "us-east1"}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app/name": "test",
},
},
}
objs := []runtime.Object{makeEndpointSliceV1(), makeNode("foobar", "", "", nodeLabels, nil), svc}
n, _ := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...)
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"endpointslice/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "",
"__meta_kubernetes_endpointslice_address_target_name": "",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_topology": "value",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
"__meta_kubernetes_node_label_az": "us-east1",
"__meta_kubernetes_node_labelpresent_az": "true",
"__meta_kubernetes_node_name": "foobar",
},
{
"__address__": "2.3.4.5:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "3.4.5.6:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_name": "testendpoints",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpointslice/default/testendpoints",
},
},
}.Run(t)
}
func TestEndpointsSlicesDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
metadataConfig := AttachMetadataConfig{Node: true}
nodeLabels := map[string]string{"az": "us-east1"}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app/name": "test",
},
},
}
node := makeNode("foobar", "", "", nodeLabels, nil)
objs := []runtime.Object{makeEndpointSliceV1(), node, svc}
n, c := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...)
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 2,
afterStart: func() {
node.Labels["az"] = "us-central1"
c.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{})
},
expectedRes: map[string]*targetgroup.Group{
"endpointslice/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "",
"__meta_kubernetes_endpointslice_address_target_name": "",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_topology": "value",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
"__meta_kubernetes_node_label_az": "us-central1",
"__meta_kubernetes_node_labelpresent_az": "true",
"__meta_kubernetes_node_name": "foobar",
},
{
"__address__": "2.3.4.5:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "3.4.5.6:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_name": "testendpoints",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpointslice/default/testendpoints",
},
},
}.Run(t)
}
func TestEndpointSliceDiscoveryNamespaces(t *testing.T) { func TestEndpointSliceDiscoveryNamespaces(t *testing.T) {
epOne := makeEndpointSliceV1() epOne := makeEndpointSliceV1()
epOne.Namespace = "ns1" epOne.Namespace = "ns1"

View file

@ -23,6 +23,8 @@ import (
"sync" "sync"
"time" "time"
disv1beta1 "k8s.io/api/discovery/v1beta1"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -31,7 +33,6 @@ import (
"github.com/prometheus/common/version" "github.com/prometheus/common/version"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
disv1 "k8s.io/api/discovery/v1" disv1 "k8s.io/api/discovery/v1"
disv1beta1 "k8s.io/api/discovery/v1beta1"
networkv1 "k8s.io/api/networking/v1" networkv1 "k8s.io/api/networking/v1"
"k8s.io/api/networking/v1beta1" "k8s.io/api/networking/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -406,7 +407,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
} }
for _, namespace := range namespaces { for _, namespace := range namespaces {
var informer cache.SharedInformer var informer cache.SharedIndexInformer
if v1Supported { if v1Supported {
e := d.client.DiscoveryV1().EndpointSlices(namespace) e := d.client.DiscoveryV1().EndpointSlices(namespace)
elw := &cache.ListWatch{ elw := &cache.ListWatch{
@ -421,7 +422,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return e.Watch(ctx, options) return e.Watch(ctx, options)
}, },
} }
informer = cache.NewSharedInformer(elw, &disv1.EndpointSlice{}, resyncPeriod) informer = d.newEndpointSlicesByNodeInformer(elw, &disv1.EndpointSlice{})
} else { } else {
e := d.client.DiscoveryV1beta1().EndpointSlices(namespace) e := d.client.DiscoveryV1beta1().EndpointSlices(namespace)
elw := &cache.ListWatch{ elw := &cache.ListWatch{
@ -436,7 +437,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return e.Watch(ctx, options) return e.Watch(ctx, options)
}, },
} }
informer = cache.NewSharedInformer(elw, &disv1beta1.EndpointSlice{}, resyncPeriod) informer = d.newEndpointSlicesByNodeInformer(elw, &disv1beta1.EndpointSlice{})
} }
s := d.client.CoreV1().Services(namespace) s := d.client.CoreV1().Services(namespace)
@ -465,11 +466,17 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return p.Watch(ctx, options) return p.Watch(ctx, options)
}, },
} }
var nodeInf cache.SharedInformer
if d.attachMetadata.Node {
nodeInf = d.newNodeInformer(context.Background())
go nodeInf.Run(ctx.Done())
}
eps := NewEndpointSlice( eps := NewEndpointSlice(
log.With(d.logger, "role", "endpointslice"), log.With(d.logger, "role", "endpointslice"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
informer, informer,
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
nodeInf,
) )
d.discoverers = append(d.discoverers, eps) d.discoverers = append(d.discoverers, eps)
go eps.endpointSliceInf.Run(ctx.Done()) go eps.endpointSliceInf.Run(ctx.Done())
@ -517,11 +524,18 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return p.Watch(ctx, options) return p.Watch(ctx, options)
}, },
} }
var nodeInf cache.SharedInformer
if d.attachMetadata.Node {
nodeInf = d.newNodeInformer(ctx)
go nodeInf.Run(ctx.Done())
}
eps := NewEndpoints( eps := NewEndpoints(
log.With(d.logger, "role", "endpoint"), log.With(d.logger, "role", "endpoint"),
d.newEndpointsByNodeInformer(elw),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
nodeInf,
) )
d.discoverers = append(d.discoverers, eps) d.discoverers = append(d.discoverers, eps)
go eps.endpointsInf.Run(ctx.Done()) go eps.endpointsInf.Run(ctx.Done())
@ -735,6 +749,65 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde
return cache.NewSharedIndexInformer(plw, &apiv1.Pod{}, resyncPeriod, indexers) return cache.NewSharedIndexInformer(plw, &apiv1.Pod{}, resyncPeriod, indexers)
} }
func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
if !d.attachMetadata.Node {
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncPeriod, indexers)
}
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*apiv1.Endpoints)
if !ok {
return nil, fmt.Errorf("object is not a pod")
}
var nodes []string
for _, target := range e.Subsets {
for _, addr := range target.Addresses {
if addr.NodeName == nil {
continue
}
nodes = append(nodes, *addr.NodeName)
}
}
return nodes, nil
}
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncPeriod, indexers)
}
func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
if !d.attachMetadata.Node {
cache.NewSharedIndexInformer(plw, &disv1.EndpointSlice{}, resyncPeriod, indexers)
}
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
var nodes []string
switch e := obj.(type) {
case *disv1.EndpointSlice:
for _, target := range e.Endpoints {
if target.NodeName == nil {
continue
}
nodes = append(nodes, *target.NodeName)
}
case *disv1beta1.EndpointSlice:
for _, target := range e.Endpoints {
if target.NodeName == nil {
continue
}
nodes = append(nodes, *target.NodeName)
}
default:
return nil, fmt.Errorf("object is not an endpointslice")
}
return nodes, nil
}
return cache.NewSharedIndexInformer(plw, object, resyncPeriod, indexers)
}
func checkDiscoveryV1Supported(client kubernetes.Interface) (bool, error) { func checkDiscoveryV1Supported(client kubernetes.Interface) (bool, error) {
k8sVer, err := client.Discovery().ServerVersion() k8sVer, err := client.Discovery().ServerVersion()
if err != nil { if err != nil {

View file

@ -253,7 +253,7 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group {
tg.Labels = podLabels(pod) tg.Labels = podLabels(pod)
tg.Labels[namespaceLabel] = lv(pod.Namespace) tg.Labels[namespaceLabel] = lv(pod.Namespace)
if p.withNodeMetadata { if p.withNodeMetadata {
p.attachNodeMetadata(tg, pod) tg.Labels = addNodeLabels(tg.Labels, p.nodeInf, p.logger, &pod.Spec.NodeName)
} }
containers := append(pod.Spec.Containers, pod.Spec.InitContainers...) containers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
@ -291,27 +291,6 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group {
return tg return tg
} }
func (p *Pod) attachNodeMetadata(tg *targetgroup.Group, pod *apiv1.Pod) {
tg.Labels[nodeNameLabel] = lv(pod.Spec.NodeName)
obj, exists, err := p.nodeInf.GetStore().GetByKey(pod.Spec.NodeName)
if err != nil {
level.Error(p.logger).Log("msg", "Error getting node", "node", pod.Spec.NodeName, "err", err)
return
}
if !exists {
return
}
node := obj.(*apiv1.Node)
for k, v := range node.GetLabels() {
ln := strutil.SanitizeLabelName(k)
tg.Labels[model.LabelName(nodeLabelPrefix+ln)] = lv(v)
tg.Labels[model.LabelName(nodeLabelPresentPrefix+ln)] = presentValue
}
}
func (p *Pod) enqueuePodsForNode(nodeName string) { func (p *Pod) enqueuePodsForNode(nodeName string) {
pods, err := p.podInf.GetIndexer().ByIndex(nodeIndex, nodeName) pods, err := p.podInf.GetIndexer().ByIndex(nodeIndex, nodeName)
if err != nil { if err != nil {

View file

@ -1851,7 +1851,7 @@ namespaces:
# Optional metadata to attach to discovered targets. If omitted, no additional metadata is attached. # Optional metadata to attach to discovered targets. If omitted, no additional metadata is attached.
attach_metadata: attach_metadata:
# Attaches node metadata to discovered targets. Only valid for role: pod. # Attaches node metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice.
# When set to true, Prometheus must have permissions to get Nodes. # When set to true, Prometheus must have permissions to get Nodes.
[ node: <boolean> | default = false ] [ node: <boolean> | default = false ]
``` ```