kubernetes_sd: fix namespace filtering (#4273)

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2018-06-14 16:49:43 +02:00 committed by Brian Brazil
parent e7cfc7dae5
commit 6eab4bbca1
5 changed files with 282 additions and 64 deletions

View file

@ -19,6 +19,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1"
)
@ -468,3 +469,137 @@ func TestEndpointsDiscoveryWithServiceUpdate(t *testing.T) {
},
}.Run(t)
}
func TestEndpointsDiscoveryNamespaces(t *testing.T) {
epOne := makeEndpoints()
epOne.Namespace = "ns1"
objs := []runtime.Object{
epOne,
&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "ns2",
},
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "4.3.2.1",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "testpod",
Namespace: "ns2",
},
},
},
Ports: []v1.EndpointPort{
{
Name: "testport",
Port: 9000,
Protocol: v1.ProtocolTCP,
},
},
},
},
},
&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "ns1",
Labels: map[string]string{
"app": "app1",
},
},
},
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
Namespace: "ns2",
UID: types.UID("deadbeef"),
},
Spec: v1.PodSpec{
NodeName: "testnode",
Containers: []v1.Container{
{
Name: "c1",
Ports: []v1.ContainerPort{
{
Name: "mainport",
ContainerPort: 9000,
Protocol: v1.ProtocolTCP,
},
},
},
},
},
Status: v1.PodStatus{
HostIP: "2.3.4.5",
PodIP: "4.3.2.1",
},
},
}
n, _, _ := makeDiscovery(RoleEndpoint, NamespaceDiscovery{Names: []string{"ns1", "ns2"}}, objs...)
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"endpoints/ns1/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "ns1",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_service_label_app": "app1",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpoints/ns1/testendpoints",
},
"endpoints/ns2/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "4.3.2.1:9000",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
"__meta_kubernetes_endpoint_address_target_kind": "Pod",
"__meta_kubernetes_endpoint_address_target_name": "testpod",
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_pod_ip": "4.3.2.1",
"__meta_kubernetes_pod_ready": "unknown",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_container_name": "c1",
"__meta_kubernetes_pod_container_port_name": "mainport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
"__meta_kubernetes_pod_uid": "deadbeef",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "ns2",
"__meta_kubernetes_endpoints_name": "testendpoints",
},
Source: "endpoints/ns2/testendpoints",
},
},
}.Run(t)
}

View file

@ -14,6 +14,7 @@
package kubernetes
import (
"fmt"
"testing"
"github.com/prometheus/common/model"
@ -64,13 +65,14 @@ func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress {
}
}
func expectedTargetGroups(tls bool) map[string]*targetgroup.Group {
func expectedTargetGroups(ns string, tls bool) map[string]*targetgroup.Group {
scheme := "http"
if tls {
scheme = "https"
}
key := fmt.Sprintf("ingress/%s/testingress", ns)
return map[string]*targetgroup.Group{
"ingress/default/testingress": {
key: {
Targets: []model.LabelSet{
{
"__meta_kubernetes_ingress_scheme": lv(scheme),
@ -93,11 +95,11 @@ func expectedTargetGroups(tls bool) map[string]*targetgroup.Group {
},
Labels: model.LabelSet{
"__meta_kubernetes_ingress_name": "testingress",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_namespace": lv(ns),
"__meta_kubernetes_ingress_label_testlabel": "testvalue",
"__meta_kubernetes_ingress_annotation_testannotation": "testannotationvalue",
},
Source: "ingress/default/testingress",
Source: key,
},
}
}
@ -113,7 +115,7 @@ func TestIngressDiscoveryAdd(t *testing.T) {
w.Ingresses().Add(obj)
},
expectedMaxItems: 1,
expectedRes: expectedTargetGroups(false),
expectedRes: expectedTargetGroups("default", false),
}.Run(t)
}
@ -128,6 +130,28 @@ func TestIngressDiscoveryAddTLS(t *testing.T) {
w.Ingresses().Add(obj)
},
expectedMaxItems: 1,
expectedRes: expectedTargetGroups(true),
expectedRes: expectedTargetGroups("default", true),
}.Run(t)
}
func TestIngressDiscoveryNamespaces(t *testing.T) {
n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"ns1", "ns2"}})
expected := expectedTargetGroups("ns1", false)
for k, v := range expectedTargetGroups("ns2", false) {
expected[k] = v
}
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
for _, ns := range []string{"ns1", "ns2"} {
obj := makeIngress(nil)
obj.Namespace = ns
c.ExtensionsV1beta1().Ingresses(obj.Namespace).Create(obj)
w.Ingresses().Add(obj)
}
},
expectedMaxItems: 2,
expectedRes: expected,
}.Run(t)
}

View file

@ -250,28 +250,31 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
switch d.role {
case RoleEndpoint:
for _, namespace := range namespaces {
e := d.client.CoreV1().Endpoints(namespace)
elw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Endpoints(namespace).List(options)
return e.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Endpoints(namespace).Watch(options)
return e.Watch(options)
},
}
s := d.client.CoreV1().Services(namespace)
slw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Services(namespace).List(options)
return s.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Services(namespace).Watch(options)
return s.Watch(options)
},
}
p := d.client.CoreV1().Pods(namespace)
plw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Pods(namespace).List(options)
return p.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Pods(namespace).Watch(options)
return p.Watch(options)
},
}
eps := NewEndpoints(
@ -287,12 +290,13 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
case RolePod:
for _, namespace := range namespaces {
p := d.client.CoreV1().Pods(namespace)
plw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Pods(namespace).List(options)
return p.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Pods(namespace).Watch(options)
return p.Watch(options)
},
}
pod := NewPod(
@ -304,12 +308,13 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
case RoleService:
for _, namespace := range namespaces {
s := d.client.CoreV1().Services(namespace)
slw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Services(namespace).List(options)
return s.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Services(namespace).Watch(options)
return s.Watch(options)
},
}
svc := NewService(
@ -321,12 +326,13 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
case RoleIngress:
for _, namespace := range namespaces {
i := d.client.ExtensionsV1beta1().Ingresses(namespace)
ilw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.ExtensionsV1beta1().Ingresses(namespace).List(options)
return i.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.ExtensionsV1beta1().Ingresses(namespace).Watch(options)
return i.Watch(options)
},
}
ingress := NewIngress(

View file

@ -14,6 +14,7 @@
package kubernetes
import (
"fmt"
"testing"
"github.com/prometheus/common/model"
@ -114,6 +115,33 @@ func makePods() *v1.Pod {
}
}
func expectedPodTargetGroups(ns string) map[string]*targetgroup.Group {
key := fmt.Sprintf("pod/%s/testpod", ns)
return map[string]*targetgroup.Group{
key: {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_namespace": lv(ns),
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_uid": "abc123",
},
Source: key,
},
}
}
func TestPodDiscoveryBeforeRun(t *testing.T) {
n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{})
@ -177,29 +205,7 @@ func TestPodDiscoveryAdd(t *testing.T) {
w.Pods().Add(obj)
},
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"pod/default/testpod": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_uid": "abc123",
},
Source: "pod/default/testpod",
},
},
expectedRes: expectedPodTargetGroups("default"),
}.Run(t)
}
@ -260,29 +266,7 @@ func TestPodDiscoveryUpdate(t *testing.T) {
w.Pods().Modify(obj)
},
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"pod/default/testpod": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_uid": "abc123",
},
Source: "pod/default/testpod",
},
},
expectedRes: expectedPodTargetGroups("default"),
}.Run(t)
}
@ -311,3 +295,25 @@ func TestPodDiscoveryUpdateEmptyPodIP(t *testing.T) {
},
}.Run(t)
}
func TestPodDiscoveryNamespaces(t *testing.T) {
n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{Names: []string{"ns1", "ns2"}})
expected := expectedPodTargetGroups("ns1")
for k, v := range expectedPodTargetGroups("ns2") {
expected[k] = v
}
k8sDiscoveryTest{
discovery: n,
beforeRun: func() {
for _, ns := range []string{"ns1", "ns2"} {
pod := makePods()
pod.Namespace = ns
c.CoreV1().Pods(pod.Namespace).Create(pod)
w.Pods().Add(pod)
}
},
expectedMaxItems: 2,
expectedRes: expected,
}.Run(t)
}

View file

@ -155,3 +155,50 @@ func TestServiceDiscoveryUpdate(t *testing.T) {
},
}.Run(t)
}
func TestServiceDiscoveryNamespaces(t *testing.T) {
n, c, w := makeDiscovery(RoleService, NamespaceDiscovery{Names: []string{"ns1", "ns2"}})
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
for _, ns := range []string{"ns1", "ns2"} {
obj := makeService()
obj.Namespace = ns
c.CoreV1().Services(obj.Namespace).Create(obj)
w.Services().Add(obj)
}
},
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"svc/ns1/testservice": {
Targets: []model.LabelSet{
{
"__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.ns1.svc:30900",
"__meta_kubernetes_service_port_name": "testport",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_service_name": "testservice",
"__meta_kubernetes_namespace": "ns1",
},
Source: "svc/ns1/testservice",
},
"svc/ns2/testservice": {
Targets: []model.LabelSet{
{
"__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.ns2.svc:30900",
"__meta_kubernetes_service_port_name": "testport",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_service_name": "testservice",
"__meta_kubernetes_namespace": "ns2",
},
Source: "svc/ns2/testservice",
},
},
}.Run(t)
}