From fa798d30429db29586f36c2c8d343821ccd87a2a Mon Sep 17 00:00:00 2001 From: fpetkovski Date: Thu, 23 Dec 2021 10:50:00 +0100 Subject: [PATCH] Allow attaching node metadata Signed-off-by: fpetkovski --- discovery/kubernetes/kubernetes.go | 52 +++++++++++----- discovery/kubernetes/kubernetes_test.go | 9 ++- discovery/kubernetes/pod.go | 82 +++++++++++++++++++++---- discovery/kubernetes/pod_test.go | 56 +++++++++++++++++ docs/configuration/configuration.md | 6 ++ 5 files changed, 176 insertions(+), 29 deletions(-) diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index d7a864c00..084c0e7ed 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -122,6 +122,7 @@ type SDConfig struct { HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` NamespaceDiscovery NamespaceDiscovery `yaml:"namespaces,omitempty"` Selectors []SelectorConfig `yaml:"selectors,omitempty"` + AttachMetadata AttachMetadataConfig `yaml:"attach_metadata,omitempty"` } // Name returns the name of the Config. @@ -158,6 +159,12 @@ type resourceSelector struct { field string } +// AttachMetadataConfig is the configuration for attaching additional metadata +// coming from nodes on which the targets are scheduled. +type AttachMetadataConfig struct { + Node bool `yaml:"node"` +} + // UnmarshalYAML implements the yaml.Unmarshaler interface. func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = DefaultSDConfig @@ -259,6 +266,7 @@ type Discovery struct { discoverers []discovery.Discoverer selectors roleSelector ownNamespace string + attachMetadata AttachMetadataConfig } func (d *Discovery) getNamespaces() []string { @@ -337,6 +345,7 @@ func New(l log.Logger, conf *SDConfig) (*Discovery, error) { discoverers: make([]discovery.Discoverer, 0), selectors: mapSelector(conf.Selectors), ownNamespace: ownNamespace, + attachMetadata: conf.AttachMetadata, }, nil } @@ -480,6 +489,12 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { go eps.podInf.Run(ctx.Done()) } case RolePod: + var nodeInformer cache.SharedInformer + if d.attachMetadata.Node { + nodeInformer = d.newNodeInformer(ctx) + go nodeInformer.Run(ctx.Done()) + } + for _, namespace := range namespaces { p := d.client.CoreV1().Pods(namespace) plw := &cache.ListWatch{ @@ -497,9 +512,10 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { pod := NewPod( log.With(d.logger, "role", "pod"), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), + nodeInformer, ) d.discoverers = append(d.discoverers, pod) - go pod.informer.Run(ctx.Done()) + go pod.podInf.Run(ctx.Done()) } case RoleService: for _, namespace := range namespaces { @@ -581,22 +597,8 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { go ingress.informer.Run(ctx.Done()) } case RoleNode: - nlw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = d.selectors.node.field - options.LabelSelector = d.selectors.node.label - return d.client.CoreV1().Nodes().List(ctx, options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = d.selectors.node.field - options.LabelSelector = d.selectors.node.label - return d.client.CoreV1().Nodes().Watch(ctx, options) - }, - } - node := NewNode( - log.With(d.logger, "role", "node"), - cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod), - ) + nodeInformer := d.newNodeInformer(ctx) + node := NewNode(log.With(d.logger, "role", "node"), nodeInformer) d.discoverers = append(d.discoverers, node) go node.informer.Run(ctx.Done()) default: @@ -661,3 +663,19 @@ func checkNetworkingV1Supported(client kubernetes.Interface) (bool, error) { // https://github.com/kubernetes/kubernetes/blob/master/CHANGELOG/CHANGELOG-1.19.md return semVer.Major() >= 1 && semVer.Minor() >= 19, nil } + +func (d *Discovery) newNodeInformer(ctx context.Context) cache.SharedInformer { + nlw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = d.selectors.node.field + options.LabelSelector = d.selectors.node.label + return d.client.CoreV1().Nodes().List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = d.selectors.node.field + options.LabelSelector = d.selectors.node.label + return d.client.CoreV1().Nodes().Watch(ctx, options) + }, + } + return cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod) +} diff --git a/discovery/kubernetes/kubernetes_test.go b/discovery/kubernetes/kubernetes_test.go index 927c36d2d..3e41c6b58 100644 --- a/discovery/kubernetes/kubernetes_test.go +++ b/discovery/kubernetes/kubernetes_test.go @@ -58,6 +58,13 @@ func makeDiscoveryWithVersion(role Role, nsDiscovery NamespaceDiscovery, k8sVer }, clientset } +// makeDiscoveryWithMetadata creates a kubernetes.Discovery instance with the specified metadata config. +func makeDiscoveryWithMetadata(role Role, nsDiscovery NamespaceDiscovery, attachMetadata AttachMetadataConfig, objects ...runtime.Object) (*Discovery, kubernetes.Interface) { + d, k8s := makeDiscovery(role, nsDiscovery, objects...) + d.attachMetadata = attachMetadata + return d, k8s +} + type k8sDiscoveryTest struct { // discovery is instance of discovery.Discoverer discovery discovery.Discoverer @@ -215,7 +222,7 @@ func (i *Ingress) hasSynced() bool { } func (p *Pod) hasSynced() bool { - return p.informer.HasSynced() + return p.podInf.HasSynced() } func (s *Service) hasSynced() bool { diff --git a/discovery/kubernetes/pod.go b/discovery/kubernetes/pod.go index b5c94862a..a670be542 100644 --- a/discovery/kubernetes/pod.go +++ b/discovery/kubernetes/pod.go @@ -40,24 +40,28 @@ var ( // Pod discovers new pod targets. type Pod struct { - informer cache.SharedInformer - store cache.Store - logger log.Logger - queue *workqueue.Type + podInf cache.SharedInformer + nodeInf cache.SharedInformer + withNodeMetadata bool + store cache.Store + logger log.Logger + queue *workqueue.Type } // NewPod creates a new pod discovery. -func NewPod(l log.Logger, pods cache.SharedInformer) *Pod { +func NewPod(l log.Logger, pods, nodes cache.SharedInformer) *Pod { if l == nil { l = log.NewNopLogger() } p := &Pod{ - informer: pods, - store: pods.GetStore(), - logger: l, - queue: workqueue.NewNamed("pod"), + podInf: pods, + nodeInf: nodes, + withNodeMetadata: nodes != nil, + store: pods.GetStore(), + logger: l, + queue: workqueue.NewNamed("pod"), } - p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + p.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { podAddCount.Inc() p.enqueue(o) @@ -71,6 +75,24 @@ func NewPod(l log.Logger, pods cache.SharedInformer) *Pod { p.enqueue(o) }, }) + + if p.withNodeMetadata { + p.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + node := o.(*apiv1.Node) + p.enqueuePodsForNode(node.Name) + }, + UpdateFunc: func(_, o interface{}) { + node := o.(*apiv1.Node) + p.enqueuePodsForNode(node.Name) + }, + DeleteFunc: func(o interface{}) { + node := o.(*apiv1.Node) + p.enqueuePodsForNode(node.Name) + }, + }) + } + return p } @@ -87,7 +109,12 @@ func (p *Pod) enqueue(obj interface{}) { func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { defer p.queue.ShutDown() - if !cache.WaitForCacheSync(ctx.Done(), p.informer.HasSynced) { + cacheSyncs := []cache.InformerSynced{p.podInf.HasSynced} + if p.withNodeMetadata { + cacheSyncs = append(cacheSyncs, p.nodeInf.HasSynced) + } + + if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) { if ctx.Err() != context.Canceled { level.Error(p.logger).Log("msg", "pod informer unable to sync cache") } @@ -221,6 +248,9 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group { tg.Labels = podLabels(pod) tg.Labels[namespaceLabel] = lv(pod.Namespace) + if p.withNodeMetadata { + p.attachNodeMetadata(tg, pod) + } containers := append(pod.Spec.Containers, pod.Spec.InitContainers...) for i, c := range containers { @@ -257,6 +287,36 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group { return tg } +func (p *Pod) attachNodeMetadata(tg *targetgroup.Group, pod *apiv1.Pod) { + tg.Labels[nodeNameLabel] = lv(pod.Spec.NodeName) + + obj, exists, err := p.nodeInf.GetStore().GetByKey(pod.Spec.NodeName) + if err != nil { + level.Error(p.logger).Log("msg", "Error getting node", "node", pod.Spec.NodeName, "err", err) + return + } + + if !exists { + return + } + + node := obj.(*apiv1.Node) + for k, v := range node.GetLabels() { + ln := strutil.SanitizeLabelName(k) + tg.Labels[model.LabelName(nodeLabelPrefix+ln)] = lv(v) + tg.Labels[model.LabelName(nodeLabelPresentPrefix+ln)] = presentValue + } +} + +func (p *Pod) enqueuePodsForNode(nodeName string) { + for _, pod := range p.store.List() { + pod := pod.(*apiv1.Pod) + if pod.Spec.NodeName == nodeName { + p.enqueue(pod) + } + } +} + func podSource(pod *apiv1.Pod) string { return podSourceFromNamespaceAndName(pod.Namespace, pod.Name) } diff --git a/discovery/kubernetes/pod_test.go b/discovery/kubernetes/pod_test.go index d884ad760..29655d022 100644 --- a/discovery/kubernetes/pod_test.go +++ b/discovery/kubernetes/pod_test.go @@ -190,6 +190,19 @@ func expectedPodTargetGroups(ns string) map[string]*targetgroup.Group { } } +func expectedPodTargetGroupsWithNodeMeta(ns, nodeName string, nodeLabels map[string]string) map[string]*targetgroup.Group { + result := expectedPodTargetGroups(ns) + for _, tg := range result { + tg.Labels["__meta_kubernetes_node_name"] = lv(nodeName) + for k, v := range nodeLabels { + tg.Labels[model.LabelName("__meta_kubernetes_node_label_"+k)] = lv(v) + tg.Labels[model.LabelName("__meta_kubernetes_node_labelpresent_"+k)] = lv("true") + } + } + + return result +} + func TestPodDiscoveryBeforeRun(t *testing.T) { n, c := makeDiscovery(RolePod, NamespaceDiscovery{}) @@ -407,3 +420,46 @@ func TestPodDiscoveryOwnNamespace(t *testing.T) { expectedRes: expected, }.Run(t) } + +func TestPodDiscoveryWithNodeMetadata(t *testing.T) { + attachMetadata := AttachMetadataConfig{Node: true} + n, c := makeDiscoveryWithMetadata(RolePod, NamespaceDiscovery{}, attachMetadata) + nodeLbls := map[string]string{"l1": "v1"} + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + nodes := makeNode("testnode", "", "", nodeLbls, nil) + c.CoreV1().Nodes().Create(context.Background(), nodes, metav1.CreateOptions{}) + + pods := makePods() + c.CoreV1().Pods(pods.Namespace).Create(context.Background(), pods, metav1.CreateOptions{}) + }, + expectedMaxItems: 2, + expectedRes: expectedPodTargetGroupsWithNodeMeta("default", "testnode", nodeLbls), + }.Run(t) +} + +func TestPodDiscoveryWithNodeMetadataUpdateNode(t *testing.T) { + nodeLbls := map[string]string{"l2": "v2"} + attachMetadata := AttachMetadataConfig{Node: true} + n, c := makeDiscoveryWithMetadata(RolePod, NamespaceDiscovery{}, attachMetadata) + + k8sDiscoveryTest{ + discovery: n, + beforeRun: func() { + oldNodeLbls := map[string]string{"l1": "v1"} + nodes := makeNode("testnode", "", "", oldNodeLbls, nil) + c.CoreV1().Nodes().Create(context.Background(), nodes, metav1.CreateOptions{}) + }, + afterStart: func() { + pods := makePods() + c.CoreV1().Pods(pods.Namespace).Create(context.Background(), pods, metav1.CreateOptions{}) + + nodes := makeNode("testnode", "", "", nodeLbls, nil) + c.CoreV1().Nodes().Update(context.Background(), nodes, metav1.UpdateOptions{}) + }, + expectedMaxItems: 2, + expectedRes: expectedPodTargetGroupsWithNodeMeta("default", "testnode", nodeLbls), + }.Run(t) +} diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 7b11afb0a..510769257 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1720,6 +1720,12 @@ namespaces: [ - role: [ label: ] [ field: ] ]] + +# Optional metadata to attach to discovered targets. If omitted, no additional metadata is attached. +attach_metadata: +# Attaches node metadata to discovered targets. Only valid for role: pod. +# When set to true, Prometheus must have permissions to get Nodes. + [ node: | default = false ] ``` See [this example Prometheus configuration file](/documentation/examples/prometheus-kubernetes.yml)