kubernetes: fix missing port labels

This commit fixes endpoint port labeling, adjusts tests accordingly
and enhances test delta printing
This commit is contained in:
Fabian Reinartz 2016-10-17 11:05:13 +02:00
parent 1df905ea42
commit ce45040e47
6 changed files with 49 additions and 45 deletions

View file

@ -106,6 +106,8 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
} }
} }
e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO(fabxc): potentially remove add and delete event handlers. Those should
// be triggered via the endpoint handlers already.
AddFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, AddFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) },
UpdateFunc: func(_, o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, UpdateFunc: func(_, o interface{}) { serviceUpdate(o.(*apiv1.Service)) },
DeleteFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, DeleteFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) },
@ -147,7 +149,7 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup {
seenPods := map[string]*podEntry{} seenPods := map[string]*podEntry{}
add := func(addr apiv1.EndpointAddress, port apiv1.EndpointPort, ready string) { add := func(addr apiv1.EndpointAddress, port apiv1.EndpointPort, ready string) {
a := net.JoinHostPort(addr.IP, strconv.FormatInt(int64(port.Port), 10)) a := net.JoinHostPort(addr.IP, strconv.FormatUint(uint64(port.Port), 10))
target := model.LabelSet{ target := model.LabelSet{
model.AddressLabel: lv(a), model.AddressLabel: lv(a),
@ -177,8 +179,11 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup {
for _, c := range pod.Spec.Containers { for _, c := range pod.Spec.Containers {
for _, cport := range c.Ports { for _, cport := range c.Ports {
if port.Port == cport.ContainerPort { if port.Port == cport.ContainerPort {
ports := strconv.FormatUint(uint64(port.Port), 10)
target[podContainerNameLabel] = lv(c.Name) target[podContainerNameLabel] = lv(c.Name)
target[podContainerPortNameLabel] = lv(port.Name) target[podContainerPortNameLabel] = lv(cport.Name)
target[podContainerPortNumberLabel] = lv(ports)
target[podContainerPortProtocolLabel] = lv(string(port.Protocol)) target[podContainerPortProtocolLabel] = lv(string(port.Protocol))
break break
} }
@ -221,12 +226,14 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup {
continue continue
} }
a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatInt(int64(cport.ContainerPort), 10)) a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatUint(uint64(cport.ContainerPort), 10))
ports := strconv.FormatUint(uint64(cport.ContainerPort), 10)
target := model.LabelSet{ target := model.LabelSet{
model.AddressLabel: lv(a), model.AddressLabel: lv(a),
podContainerNameLabel: lv(c.Name), podContainerNameLabel: lv(c.Name),
podContainerPortNameLabel: lv(cport.Name), podContainerPortNameLabel: lv(cport.Name),
podContainerPortNumberLabel: lv(ports),
podContainerPortProtocolLabel: lv(string(cport.Protocol)), podContainerPortProtocolLabel: lv(string(cport.Protocol)),
} }
tg.Targets = append(tg.Targets, target.Merge(podLabels(pe.pod))) tg.Targets = append(tg.Targets, target.Merge(podLabels(pe.pod)))

View file

@ -14,7 +14,6 @@
package kubernetes package kubernetes
import ( import (
//"fmt"
"testing" "testing"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
@ -23,22 +22,22 @@ import (
"k8s.io/client-go/1.5/pkg/api/v1" "k8s.io/client-go/1.5/pkg/api/v1"
) )
func endpointStoreKeyFunc(obj interface{}) (string, error) { func endpointsStoreKeyFunc(obj interface{}) (string, error) {
return obj.(*v1.Endpoints).ObjectMeta.Name, nil return obj.(*v1.Endpoints).ObjectMeta.Name, nil
} }
func newFakeEndpointInformer() *fakeInformer { func newFakeEndpointsInformer() *fakeInformer {
return newFakeInformer(endpointStoreKeyFunc) return newFakeInformer(endpointsStoreKeyFunc)
} }
func makeTestEndpointDiscovery() (*Endpoints, *fakeInformer, *fakeInformer, *fakeInformer) { func makeTestEndpointsDiscovery() (*Endpoints, *fakeInformer, *fakeInformer, *fakeInformer) {
svc := newFakeServiceInformer() svc := newFakeServiceInformer()
eps := newFakeEndpointInformer() eps := newFakeEndpointsInformer()
pod := newFakePodInformer() pod := newFakePodInformer()
return NewEndpoints(log.Base(), svc, eps, pod), svc, eps, pod return NewEndpoints(log.Base(), svc, eps, pod), svc, eps, pod
} }
func makeEndpoint() *v1.Endpoints { func makeEndpoints() *v1.Endpoints {
return &v1.Endpoints{ return &v1.Endpoints{
ObjectMeta: v1.ObjectMeta{ ObjectMeta: v1.ObjectMeta{
Name: "testendpoints", Name: "testendpoints",
@ -82,9 +81,9 @@ func makeEndpoint() *v1.Endpoints {
} }
} }
func TestEndpointDiscoveryInitial(t *testing.T) { func TestEndpointsDiscoveryInitial(t *testing.T) {
n, _, eps, _ := makeTestEndpointDiscovery() n, _, eps, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoint()) eps.GetStore().Add(makeEndpoints())
k8sDiscoveryTest{ k8sDiscoveryTest{
discovery: n, discovery: n,
@ -120,8 +119,8 @@ func TestEndpointDiscoveryInitial(t *testing.T) {
}.Run(t) }.Run(t)
} }
func TestEndpointDiscoveryAdd(t *testing.T) { func TestEndpointsDiscoveryAdd(t *testing.T) {
n, _, eps, pods := makeTestEndpointDiscovery() n, _, eps, pods := makeTestEndpointsDiscovery()
pods.GetStore().Add(&v1.Pod{ pods.GetStore().Add(&v1.Pod{
ObjectMeta: v1.ObjectMeta{ ObjectMeta: v1.ObjectMeta{
Name: "testpod", Name: "testpod",
@ -207,7 +206,8 @@ func TestEndpointDiscoveryAdd(t *testing.T) {
"__meta_kubernetes_pod_node_name": "testnode", "__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_host_ip": "2.3.4.5", "__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_container_name": "c1", "__meta_kubernetes_pod_container_name": "c1",
"__meta_kubernetes_pod_container_port_name": "testport", "__meta_kubernetes_pod_container_port_name": "mainport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP", "__meta_kubernetes_pod_container_port_protocol": "TCP",
}, },
model.LabelSet{ model.LabelSet{
@ -219,6 +219,7 @@ func TestEndpointDiscoveryAdd(t *testing.T) {
"__meta_kubernetes_pod_host_ip": "2.3.4.5", "__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_container_name": "c2", "__meta_kubernetes_pod_container_name": "c2",
"__meta_kubernetes_pod_container_port_name": "sideport", "__meta_kubernetes_pod_container_port_name": "sideport",
"__meta_kubernetes_pod_container_port_number": "9001",
"__meta_kubernetes_pod_container_port_protocol": "TCP", "__meta_kubernetes_pod_container_port_protocol": "TCP",
}, },
}, },
@ -232,13 +233,13 @@ func TestEndpointDiscoveryAdd(t *testing.T) {
}.Run(t) }.Run(t)
} }
func TestEndpointDiscoveryDelete(t *testing.T) { func TestEndpointsDiscoveryDelete(t *testing.T) {
n, _, eps, _ := makeTestEndpointDiscovery() n, _, eps, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoint()) eps.GetStore().Add(makeEndpoints())
k8sDiscoveryTest{ k8sDiscoveryTest{
discovery: n, discovery: n,
afterStart: func() { go func() { eps.Delete(makeEndpoint()) }() }, afterStart: func() { go func() { eps.Delete(makeEndpoints()) }() },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ &config.TargetGroup{
Source: "endpoints/default/testendpoints", Source: "endpoints/default/testendpoints",
@ -247,9 +248,9 @@ func TestEndpointDiscoveryDelete(t *testing.T) {
}.Run(t) }.Run(t)
} }
func TestEndpointDiscoveryUpdate(t *testing.T) { func TestEndpointsDiscoveryUpdate(t *testing.T) {
n, _, eps, _ := makeTestEndpointDiscovery() n, _, eps, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoint()) eps.GetStore().Add(makeEndpoints())
k8sDiscoveryTest{ k8sDiscoveryTest{
discovery: n, discovery: n,

View file

@ -137,7 +137,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case "pod": case "pod":
plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil) plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil)
pod := NewPods( pod := NewPod(
k.logger.With("kubernetes_sd", "pod"), k.logger.With("kubernetes_sd", "pod"),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
) )

View file

@ -14,8 +14,8 @@
package kubernetes package kubernetes
import ( import (
"encoding/json"
"fmt" "fmt"
"reflect"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -23,6 +23,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/stretchr/testify/require"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/client-go/1.5/pkg/api/v1" "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache" "k8s.io/client-go/1.5/tools/cache"
@ -120,33 +121,28 @@ func (d k8sDiscoveryTest) Run(t *testing.T) {
initialRes := <-ch initialRes := <-ch
if d.expectedInitial != nil { if d.expectedInitial != nil {
if !reflect.DeepEqual(d.expectedInitial, initialRes) { requireTargetGroups(t, d.expectedInitial, initialRes)
printExpected(d.expectedInitial, initialRes)
t.Fatal("Initial result target group not generated as expected")
}
} }
if d.afterStart != nil && d.expectedRes != nil { if d.afterStart != nil && d.expectedRes != nil {
d.afterStart() d.afterStart()
res := <-ch res := <-ch
if !reflect.DeepEqual(d.expectedRes, res) { requireTargetGroups(t, d.expectedRes, res)
printExpected(d.expectedRes, res)
t.Fatal("Result target group not generated as expected")
}
} }
} }
func printExpected(expected, res []*config.TargetGroup) { func requireTargetGroups(t *testing.T, expected, res []*config.TargetGroup) {
fmt.Printf("\nExpected %d TargetGroups:\n\n", len(expected)) b1, err := json.Marshal(expected)
for _, e := range expected { if err != nil {
fmt.Printf("%#v\n\n", e) panic(err)
}
b2, err := json.Marshal(res)
if err != nil {
panic(err)
} }
fmt.Printf("\nResult %d TargetGroups:\n\n%", len(res)) require.JSONEq(t, string(b1), string(b2))
for _, e := range res {
fmt.Printf("%#v\n\n", e)
}
} }
func nodeStoreKeyFunc(obj interface{}) (string, error) { func nodeStoreKeyFunc(obj interface{}) (string, error) {

View file

@ -36,8 +36,8 @@ type Pod struct {
logger log.Logger logger log.Logger
} }
// NewPods creates a new pod discovery. // NewPod creates a new pod discovery.
func NewPods(l log.Logger, pods cache.SharedInformer) *Pod { func NewPod(l log.Logger, pods cache.SharedInformer) *Pod {
return &Pod{ return &Pod{
informer: pods, informer: pods,
store: pods.GetStore(), store: pods.GetStore(),
@ -147,7 +147,7 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup {
} }
// Otherwise create one target for each container/port combination. // Otherwise create one target for each container/port combination.
for _, port := range c.Ports { for _, port := range c.Ports {
ports := strconv.FormatInt(int64(port.ContainerPort), 10) ports := strconv.FormatUint(uint64(port.ContainerPort), 10)
addr := net.JoinHostPort(pod.Status.PodIP, ports) addr := net.JoinHostPort(pod.Status.PodIP, ports)
tg.Targets = append(tg.Targets, model.LabelSet{ tg.Targets = append(tg.Targets, model.LabelSet{

View file

@ -33,7 +33,7 @@ func newFakePodInformer() *fakeInformer {
func makeTestPodDiscovery() (*Pod, *fakeInformer) { func makeTestPodDiscovery() (*Pod, *fakeInformer) {
i := newFakePodInformer() i := newFakePodInformer()
return NewPods(log.Base(), i), i return NewPod(log.Base(), i), i
} }
func makeMultiPortPod() *v1.Pod { func makeMultiPortPod() *v1.Pod {