Merge pull request #11756 from mercedes-benz/kubernetes-sd-monitoring

Kubernetes SD: disable resync timer
This commit is contained in:
Frederic Branczyk 2022-12-23 11:53:38 +01:00 committed by GitHub
commit d7f0276d88
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -382,7 +382,8 @@ func mapSelector(rawSelector []SelectorConfig) roleSelector {
return rs return rs
} }
const resyncPeriod = 10 * time.Minute // Disable the informer's resync, which just periodically resends already processed updates and distort SD metrics.
const resyncDisabled = 0
// Run implements the discoverer interface. // Run implements the discoverer interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
@ -475,8 +476,8 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
eps := NewEndpointSlice( eps := NewEndpointSlice(
log.With(d.logger, "role", "endpointslice"), log.With(d.logger, "role", "endpointslice"),
informer, informer,
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
nodeInf, nodeInf,
) )
d.discoverers = append(d.discoverers, eps) d.discoverers = append(d.discoverers, eps)
@ -534,8 +535,8 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
eps := NewEndpoints( eps := NewEndpoints(
log.With(d.logger, "role", "endpoint"), log.With(d.logger, "role", "endpoint"),
d.newEndpointsByNodeInformer(elw), d.newEndpointsByNodeInformer(elw),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
nodeInf, nodeInf,
) )
d.discoverers = append(d.discoverers, eps) d.discoverers = append(d.discoverers, eps)
@ -589,7 +590,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
} }
svc := NewService( svc := NewService(
log.With(d.logger, "role", "service"), log.With(d.logger, "role", "service"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
) )
d.discoverers = append(d.discoverers, svc) d.discoverers = append(d.discoverers, svc)
go svc.informer.Run(ctx.Done()) go svc.informer.Run(ctx.Done())
@ -627,7 +628,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return i.Watch(ctx, options) return i.Watch(ctx, options)
}, },
} }
informer = cache.NewSharedInformer(ilw, &networkv1.Ingress{}, resyncPeriod) informer = cache.NewSharedInformer(ilw, &networkv1.Ingress{}, resyncDisabled)
} else { } else {
i := d.client.NetworkingV1beta1().Ingresses(namespace) i := d.client.NetworkingV1beta1().Ingresses(namespace)
ilw := &cache.ListWatch{ ilw := &cache.ListWatch{
@ -642,7 +643,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return i.Watch(ctx, options) return i.Watch(ctx, options)
}, },
} }
informer = cache.NewSharedInformer(ilw, &v1beta1.Ingress{}, resyncPeriod) informer = cache.NewSharedInformer(ilw, &v1beta1.Ingress{}, resyncDisabled)
} }
ingress := NewIngress( ingress := NewIngress(
log.With(d.logger, "role", "ingress"), log.With(d.logger, "role", "ingress"),
@ -732,7 +733,7 @@ func (d *Discovery) newNodeInformer(ctx context.Context) cache.SharedInformer {
return d.client.CoreV1().Nodes().Watch(ctx, options) return d.client.CoreV1().Nodes().Watch(ctx, options)
}, },
} }
return cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod) return cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncDisabled)
} }
func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer { func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
@ -747,13 +748,13 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde
} }
} }
return cache.NewSharedIndexInformer(plw, &apiv1.Pod{}, resyncPeriod, indexers) return cache.NewSharedIndexInformer(plw, &apiv1.Pod{}, resyncDisabled, indexers)
} }
func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer { func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc) indexers := make(map[string]cache.IndexFunc)
if !d.attachMetadata.Node { if !d.attachMetadata.Node {
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncPeriod, indexers) return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
} }
indexers[nodeIndex] = func(obj interface{}) ([]string, error) { indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
@ -773,13 +774,13 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
return nodes, nil return nodes, nil
} }
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncPeriod, indexers) return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
} }
func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer { func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc) indexers := make(map[string]cache.IndexFunc)
if !d.attachMetadata.Node { if !d.attachMetadata.Node {
cache.NewSharedIndexInformer(plw, &disv1.EndpointSlice{}, resyncPeriod, indexers) cache.NewSharedIndexInformer(plw, &disv1.EndpointSlice{}, resyncDisabled, indexers)
} }
indexers[nodeIndex] = func(obj interface{}) ([]string, error) { indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
@ -806,7 +807,7 @@ func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object
return nodes, nil return nodes, nil
} }
return cache.NewSharedIndexInformer(plw, object, resyncPeriod, indexers) return cache.NewSharedIndexInformer(plw, object, resyncDisabled, indexers)
} }
func checkDiscoveryV1Supported(client kubernetes.Interface) (bool, error) { func checkDiscoveryV1Supported(client kubernetes.Interface) (bool, error) {