From cb04fe83868a65fbe64ad1aaea456a0f4e9cde95 Mon Sep 17 00:00:00 2001 From: Takashi Kusumi Date: Mon, 16 Aug 2021 08:34:36 +0900 Subject: [PATCH] Kubernetes SD: Support networking.k8s.io/v1 Ingress Signed-off-by: Takashi Kusumi --- discovery/kubernetes/ingress.go | 105 ++++++++++++++++++--- discovery/kubernetes/ingress_test.go | 118 +++++++++++++++++++++++- discovery/kubernetes/kubernetes.go | 93 ++++++++++++++++--- discovery/kubernetes/kubernetes_test.go | 59 ++++++++++++ 4 files changed, 346 insertions(+), 29 deletions(-) diff --git a/discovery/kubernetes/ingress.go b/discovery/kubernetes/ingress.go index 042fa8870..f91cf85a2 100644 --- a/discovery/kubernetes/ingress.go +++ b/discovery/kubernetes/ingress.go @@ -20,6 +20,7 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/common/model" + v1 "k8s.io/api/networking/v1" "k8s.io/api/networking/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -112,25 +113,26 @@ func (i *Ingress) process(ctx context.Context, ch chan<- []*targetgroup.Group) b send(ctx, ch, &targetgroup.Group{Source: ingressSourceFromNamespaceAndName(namespace, name)}) return true } - eps, err := convertToIngress(o) - if err != nil { - level.Error(i.logger).Log("msg", "converting to Ingress object failed", "err", err) + + switch ingress := o.(type) { + case *v1.Ingress: + send(ctx, ch, i.buildIngress(ingress)) + return true + case *v1beta1.Ingress: + send(ctx, ch, i.buildIngressV1beta1(ingress)) return true } - send(ctx, ch, i.buildIngress(eps)) + + level.Error(i.logger).Log("msg", "converting to Ingress object failed", "err", + errors.Errorf("received unexpected object: %v", o)) return true } -func convertToIngress(o interface{}) (*v1beta1.Ingress, error) { - ingress, ok := o.(*v1beta1.Ingress) - if ok { - return ingress, nil - } - - return nil, errors.Errorf("received unexpected object: %v", o) +func ingressSource(s *v1.Ingress) string { + return ingressSourceFromNamespaceAndName(s.Namespace, s.Name) } -func ingressSource(s *v1beta1.Ingress) string { +func ingressSourceV1beta1(s *v1beta1.Ingress) string { return ingressSourceFromNamespaceAndName(s.Namespace, s.Name) } @@ -150,7 +152,7 @@ const ( ingressClassNameLabel = metaLabelPrefix + "ingress_class_name" ) -func ingressLabels(ingress *v1beta1.Ingress) model.LabelSet { +func ingressLabels(ingress *v1.Ingress) model.LabelSet { // Each label and annotation will create two key-value pairs in the map. ls := make(model.LabelSet, 2*(len(ingress.Labels)+len(ingress.Annotations))+2) ls[ingressNameLabel] = lv(ingress.Name) @@ -173,7 +175,30 @@ func ingressLabels(ingress *v1beta1.Ingress) model.LabelSet { return ls } -func pathsFromIngressRule(rv *v1beta1.IngressRuleValue) []string { +func ingressLabelsV1beta1(ingress *v1beta1.Ingress) model.LabelSet { + // Each label and annotation will create two key-value pairs in the map. + ls := make(model.LabelSet, 2*(len(ingress.Labels)+len(ingress.Annotations))+2) + ls[ingressNameLabel] = lv(ingress.Name) + ls[namespaceLabel] = lv(ingress.Namespace) + if ingress.Spec.IngressClassName != nil { + ls[ingressClassNameLabel] = lv(*ingress.Spec.IngressClassName) + } + + for k, v := range ingress.Labels { + ln := strutil.SanitizeLabelName(k) + ls[model.LabelName(ingressLabelPrefix+ln)] = lv(v) + ls[model.LabelName(ingressLabelPresentPrefix+ln)] = presentValue + } + + for k, v := range ingress.Annotations { + ln := strutil.SanitizeLabelName(k) + ls[model.LabelName(ingressAnnotationPrefix+ln)] = lv(v) + ls[model.LabelName(ingressAnnotationPresentPrefix+ln)] = presentValue + } + return ls +} + +func pathsFromIngressRule(rv *v1.IngressRuleValue) []string { if rv.HTTP == nil { return []string{"/"} } @@ -188,7 +213,22 @@ func pathsFromIngressRule(rv *v1beta1.IngressRuleValue) []string { return paths } -func (i *Ingress) buildIngress(ingress *v1beta1.Ingress) *targetgroup.Group { +func pathsFromIngressRuleV1beta1(rv *v1beta1.IngressRuleValue) []string { + if rv.HTTP == nil { + return []string{"/"} + } + paths := make([]string, len(rv.HTTP.Paths)) + for n, p := range rv.HTTP.Paths { + path := p.Path + if path == "" { + path = "/" + } + paths[n] = path + } + return paths +} + +func (i *Ingress) buildIngress(ingress *v1.Ingress) *targetgroup.Group { tg := &targetgroup.Group{ Source: ingressSource(ingress), } @@ -222,3 +262,38 @@ func (i *Ingress) buildIngress(ingress *v1beta1.Ingress) *targetgroup.Group { return tg } + +func (i *Ingress) buildIngressV1beta1(ingress *v1beta1.Ingress) *targetgroup.Group { + tg := &targetgroup.Group{ + Source: ingressSourceV1beta1(ingress), + } + tg.Labels = ingressLabelsV1beta1(ingress) + + tlsHosts := make(map[string]struct{}) + for _, tls := range ingress.Spec.TLS { + for _, host := range tls.Hosts { + tlsHosts[host] = struct{}{} + } + } + + for _, rule := range ingress.Spec.Rules { + paths := pathsFromIngressRuleV1beta1(&rule.IngressRuleValue) + + scheme := "http" + _, isTLS := tlsHosts[rule.Host] + if isTLS { + scheme = "https" + } + + for _, path := range paths { + tg.Targets = append(tg.Targets, model.LabelSet{ + model.AddressLabel: lv(rule.Host), + ingressSchemeLabel: lv(scheme), + ingressHostLabel: lv(rule.Host), + ingressPathLabel: lv(path), + }) + } + } + + return tg +} diff --git a/discovery/kubernetes/ingress_test.go b/discovery/kubernetes/ingress_test.go index 5ae5d4980..5e383a480 100644 --- a/discovery/kubernetes/ingress_test.go +++ b/discovery/kubernetes/ingress_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/prometheus/common/model" + v1 "k8s.io/api/networking/v1" "k8s.io/api/networking/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,7 +34,59 @@ const ( TLSMixed ) -func makeIngress(tls TLSMode) *v1beta1.Ingress { +func makeIngress(tls TLSMode) *v1.Ingress { + ret := &v1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testingress", + Namespace: "default", + Labels: map[string]string{"test/label": "testvalue"}, + Annotations: map[string]string{"test/annotation": "testannotationvalue"}, + }, + Spec: v1.IngressSpec{ + IngressClassName: classString("testclass"), + TLS: nil, + Rules: []v1.IngressRule{ + { + Host: "example.com", + IngressRuleValue: v1.IngressRuleValue{ + HTTP: &v1.HTTPIngressRuleValue{ + Paths: []v1.HTTPIngressPath{ + {Path: "/"}, + {Path: "/foo"}, + }, + }, + }, + }, + { + // No backend config, ignored + Host: "nobackend.example.com", + IngressRuleValue: v1.IngressRuleValue{ + HTTP: &v1.HTTPIngressRuleValue{}, + }, + }, + { + Host: "test.example.com", + IngressRuleValue: v1.IngressRuleValue{ + HTTP: &v1.HTTPIngressRuleValue{ + Paths: []v1.HTTPIngressPath{{}}, + }, + }, + }, + }, + }, + } + + switch tls { + case TLSYes: + ret.Spec.TLS = []v1.IngressTLS{{Hosts: []string{"example.com", "test.example.com"}}} + case TLSMixed: + ret.Spec.TLS = []v1.IngressTLS{{Hosts: []string{"example.com"}}} + } + + return ret +} + +func makeIngressV1beta1(tls TLSMode) *v1beta1.Ingress { ret := &v1beta1.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: "testingress", @@ -145,6 +198,20 @@ func TestIngressDiscoveryAdd(t *testing.T) { discovery: n, afterStart: func() { obj := makeIngress(TLSNo) + c.NetworkingV1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{}) + }, + expectedMaxItems: 1, + expectedRes: expectedTargetGroups("default", TLSNo), + }.Run(t) +} + +func TestIngressDiscoveryAddV1beta1(t *testing.T) { + n, c := makeDiscoveryWithVersion(RoleIngress, NamespaceDiscovery{Names: []string{"default"}}, "v1.18.0") + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + obj := makeIngressV1beta1(TLSNo) c.NetworkingV1beta1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{}) }, expectedMaxItems: 1, @@ -159,6 +226,20 @@ func TestIngressDiscoveryAddTLS(t *testing.T) { discovery: n, afterStart: func() { obj := makeIngress(TLSYes) + c.NetworkingV1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{}) + }, + expectedMaxItems: 1, + expectedRes: expectedTargetGroups("default", TLSYes), + }.Run(t) +} + +func TestIngressDiscoveryAddTLSV1beta1(t *testing.T) { + n, c := makeDiscoveryWithVersion(RoleIngress, NamespaceDiscovery{Names: []string{"default"}}, "v1.18.0") + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + obj := makeIngressV1beta1(TLSYes) c.NetworkingV1beta1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{}) }, expectedMaxItems: 1, @@ -173,6 +254,20 @@ func TestIngressDiscoveryAddMixed(t *testing.T) { discovery: n, afterStart: func() { obj := makeIngress(TLSMixed) + c.NetworkingV1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{}) + }, + expectedMaxItems: 1, + expectedRes: expectedTargetGroups("default", TLSMixed), + }.Run(t) +} + +func TestIngressDiscoveryAddMixedV1beta1(t *testing.T) { + n, c := makeDiscoveryWithVersion(RoleIngress, NamespaceDiscovery{Names: []string{"default"}}, "v1.18.0") + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + obj := makeIngressV1beta1(TLSMixed) c.NetworkingV1beta1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{}) }, expectedMaxItems: 1, @@ -193,6 +288,27 @@ func TestIngressDiscoveryNamespaces(t *testing.T) { for _, ns := range []string{"ns1", "ns2"} { obj := makeIngress(TLSNo) obj.Namespace = ns + c.NetworkingV1().Ingresses(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{}) + } + }, + expectedMaxItems: 2, + expectedRes: expected, + }.Run(t) +} + +func TestIngressDiscoveryNamespacesV1beta1(t *testing.T) { + n, c := makeDiscoveryWithVersion(RoleIngress, NamespaceDiscovery{Names: []string{"ns1", "ns2"}}, "v1.18.0") + + expected := expectedTargetGroups("ns1", TLSNo) + for k, v := range expectedTargetGroups("ns2", TLSNo) { + expected[k] = v + } + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + for _, ns := range []string{"ns1", "ns2"} { + obj := makeIngressV1beta1(TLSNo) + obj.Namespace = ns c.NetworkingV1beta1().Ingresses(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{}) } }, diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index ef3e8e6a1..3a02922a4 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -30,11 +30,13 @@ import ( "github.com/prometheus/common/version" apiv1 "k8s.io/api/core/v1" disv1beta1 "k8s.io/api/discovery/v1beta1" + networkv1 "k8s.io/api/networking/v1" "k8s.io/api/networking/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -491,23 +493,58 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { go svc.informer.Run(ctx.Done()) } case RoleIngress: + // Check "networking.k8s.io/v1" availability with retries. + // If "v1" is not avaiable, use "networking.k8s.io/v1beta1" for backward compatibility + var v1Supported bool + if retryOnError(ctx, 10*time.Second, + func() (err error) { + v1Supported, err = checkNetworkingV1Supported(d.client) + if err != nil { + level.Error(d.logger).Log("msg", "Failed to check networking.k8s.io/v1 availability", "err", err) + } + return err + }, + ) { + d.Unlock() + return + } + for _, namespace := range namespaces { - i := d.client.NetworkingV1beta1().Ingresses(namespace) - ilw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = d.selectors.ingress.field - options.LabelSelector = d.selectors.ingress.label - return i.List(ctx, options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = d.selectors.ingress.field - options.LabelSelector = d.selectors.ingress.label - return i.Watch(ctx, options) - }, + var informer cache.SharedInformer + if v1Supported { + i := d.client.NetworkingV1().Ingresses(namespace) + ilw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = d.selectors.ingress.field + options.LabelSelector = d.selectors.ingress.label + return i.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = d.selectors.ingress.field + options.LabelSelector = d.selectors.ingress.label + return i.Watch(ctx, options) + }, + } + informer = cache.NewSharedInformer(ilw, &networkv1.Ingress{}, resyncPeriod) + } else { + i := d.client.NetworkingV1beta1().Ingresses(namespace) + ilw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = d.selectors.ingress.field + options.LabelSelector = d.selectors.ingress.label + return i.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = d.selectors.ingress.field + options.LabelSelector = d.selectors.ingress.label + return i.Watch(ctx, options) + }, + } + informer = cache.NewSharedInformer(ilw, &v1beta1.Ingress{}, resyncPeriod) } ingress := NewIngress( log.With(d.logger, "role", "ingress"), - cache.NewSharedInformer(ilw, &v1beta1.Ingress{}, resyncPeriod), + informer, ) d.discoverers = append(d.discoverers, ingress) go ingress.informer.Run(ctx.Done()) @@ -563,3 +600,33 @@ func send(ctx context.Context, ch chan<- []*targetgroup.Group, tg *targetgroup.G case ch <- []*targetgroup.Group{tg}: } } + +func retryOnError(ctx context.Context, interval time.Duration, f func() error) (canceled bool) { + var err error + err = f() + for { + if err == nil { + return false + } + select { + case <-ctx.Done(): + return true + case <-time.After(interval): + err = f() + } + } +} + +func checkNetworkingV1Supported(client kubernetes.Interface) (bool, error) { + k8sVer, err := client.Discovery().ServerVersion() + if err != nil { + return false, err + } + semVer, err := utilversion.ParseSemantic(k8sVer.String()) + if err != nil { + return false, err + } + // networking.k8s.io/v1 is available since Kubernetes v1.19 + // https://github.com/kubernetes/kubernetes/blob/master/CHANGELOG/CHANGELOG-1.19.md + return semVer.Major() >= 1 && semVer.Minor() >= 19, nil +} diff --git a/discovery/kubernetes/kubernetes_test.go b/discovery/kubernetes/kubernetes_test.go index 44a6c1b29..e1ca23402 100644 --- a/discovery/kubernetes/kubernetes_test.go +++ b/discovery/kubernetes/kubernetes_test.go @@ -20,8 +20,11 @@ import ( "time" "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/version" + fakediscovery "k8s.io/client-go/discovery/fake" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" @@ -37,7 +40,14 @@ func TestMain(m *testing.M) { // makeDiscovery creates a kubernetes.Discovery instance for testing. func makeDiscovery(role Role, nsDiscovery NamespaceDiscovery, objects ...runtime.Object) (*Discovery, kubernetes.Interface) { + return makeDiscoveryWithVersion(role, nsDiscovery, "v1.22.0", objects...) +} + +// makeDiscoveryWithVersion creates a kubernetes.Discovery instance with the specified kubernetes version for testing. +func makeDiscoveryWithVersion(role Role, nsDiscovery NamespaceDiscovery, k8sVer string, objects ...runtime.Object) (*Discovery, kubernetes.Interface) { clientset := fake.NewSimpleClientset(objects...) + fakeDiscovery, _ := clientset.Discovery().(*fakediscovery.FakeDiscovery) + fakeDiscovery.FakedServerVersion = &version.Info{GitVersion: k8sVer} return &Discovery{ client: clientset, @@ -205,3 +215,52 @@ func (p *Pod) hasSynced() bool { func (s *Service) hasSynced() bool { return s.informer.HasSynced() } + +func TestRetryOnError(t *testing.T) { + for _, successAt := range []int{1, 2, 3} { + var called int + f := func() error { + called++ + if called >= successAt { + return nil + } + return errors.New("dummy") + } + retryOnError(context.TODO(), 0, f) + require.Equal(t, successAt, called) + } +} + +func TestCheckNetworkingV1Supported(t *testing.T) { + tests := []struct { + version string + wantSupported bool + wantErr bool + }{ + {version: "v1.18.0", wantSupported: false, wantErr: false}, + {version: "v1.18.1", wantSupported: false, wantErr: false}, + // networking v1 is supported since Kubernetes v1.19 + {version: "v1.19.0", wantSupported: true, wantErr: false}, + {version: "v1.20.0-beta.2", wantSupported: true, wantErr: false}, + // error patterns + {version: "", wantSupported: false, wantErr: true}, + {version: "<>", wantSupported: false, wantErr: true}, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.version, func(t *testing.T) { + clientset := fake.NewSimpleClientset() + fakeDiscovery, _ := clientset.Discovery().(*fakediscovery.FakeDiscovery) + fakeDiscovery.FakedServerVersion = &version.Info{GitVersion: tc.version} + supported, err := checkNetworkingV1Supported(clientset) + + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.wantSupported, supported) + }) + } +}