retrieval/discovery/kubernetes: fix cache state unknown behavior (#2180)

* retrieval/discovery/kubernetes: fix cache state unknown behavior

* retrieval/discovery/kubernetes: extract type casting

* retrieval/discovery/kubernetes: add tests for possible regressions
This commit is contained in:
Frederic Branczyk 2016-11-14 16:21:38 +01:00 committed by Fabian Reinartz
parent 036715370f
commit 0fcea6e9fb
8 changed files with 266 additions and 20 deletions

View file

@ -83,17 +83,38 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) { AddFunc: func(o interface{}) {
send(e.buildEndpoints(o.(*apiv1.Endpoints))) eps, err := convertToEndpoints(o)
if err != nil {
e.logger.With("err", err).Errorln("converting to Endpoints object failed")
return
}
send(e.buildEndpoints(eps))
}, },
UpdateFunc: func(_, o interface{}) { UpdateFunc: func(_, o interface{}) {
send(e.buildEndpoints(o.(*apiv1.Endpoints))) eps, err := convertToEndpoints(o)
if err != nil {
e.logger.With("err", err).Errorln("converting to Endpoints object failed")
return
}
send(e.buildEndpoints(eps))
}, },
DeleteFunc: func(o interface{}) { DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: endpointsSource(o.(*apiv1.Endpoints).ObjectMeta)}) eps, err := convertToEndpoints(o)
if err != nil {
e.logger.With("err", err).Errorln("converting to Endpoints object failed")
return
}
send(&config.TargetGroup{Source: endpointsSource(eps)})
}, },
}) })
serviceUpdate := func(svc *apiv1.Service) { serviceUpdate := func(o interface{}) {
svc, err := convertToService(o)
if err != nil {
e.logger.With("err", err).Errorln("converting to Service object failed")
return
}
ep := &apiv1.Endpoints{} ep := &apiv1.Endpoints{}
ep.Namespace = svc.Namespace ep.Namespace = svc.Namespace
ep.Name = svc.Name ep.Name = svc.Name
@ -108,17 +129,33 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO(fabxc): potentially remove add and delete event handlers. Those should // TODO(fabxc): potentially remove add and delete event handlers. Those should
// be triggered via the endpoint handlers already. // be triggered via the endpoint handlers already.
AddFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, AddFunc: func(o interface{}) { serviceUpdate(o) },
UpdateFunc: func(_, o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, UpdateFunc: func(_, o interface{}) { serviceUpdate(o) },
DeleteFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, DeleteFunc: func(o interface{}) { serviceUpdate(o) },
}) })
// Block until the target provider is explicitly canceled. // Block until the target provider is explicitly canceled.
<-ctx.Done() <-ctx.Done()
} }
func endpointsSource(ep apiv1.ObjectMeta) string { func convertToEndpoints(o interface{}) (*apiv1.Endpoints, error) {
return "endpoints/" + ep.Namespace + "/" + ep.Name endpoints, isEndpoints := o.(*apiv1.Endpoints)
if !isEndpoints {
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
endpoints, ok = deletedState.Obj.(*apiv1.Endpoints)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj)
}
}
return endpoints, nil
}
func endpointsSource(ep *apiv1.Endpoints) string {
return "endpoints/" + ep.ObjectMeta.Namespace + "/" + ep.ObjectMeta.Name
} }
const ( const (
@ -134,7 +171,7 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup {
} }
tg := &config.TargetGroup{ tg := &config.TargetGroup{
Source: endpointsSource(eps.ObjectMeta), Source: endpointsSource(eps),
} }
tg.Labels = model.LabelSet{ tg.Labels = model.LabelSet{
namespaceLabel: lv(eps.Namespace), namespaceLabel: lv(eps.Namespace),

View file

@ -20,6 +20,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"k8s.io/client-go/1.5/pkg/api/v1" "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache"
) )
func endpointsStoreKeyFunc(obj interface{}) (string, error) { func endpointsStoreKeyFunc(obj interface{}) (string, error) {
@ -248,6 +249,21 @@ func TestEndpointsDiscoveryDelete(t *testing.T) {
}.Run(t) }.Run(t)
} }
func TestEndpointsDiscoveryDeleteUnknownCacheState(t *testing.T) {
n, _, eps, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoints())
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { eps.Delete(cache.DeletedFinalStateUnknown{Obj: makeEndpoints()}) }() },
expectedRes: []*config.TargetGroup{
&config.TargetGroup{
Source: "endpoints/default/testendpoints",
},
},
}.Run(t)
}
func TestEndpointsDiscoveryUpdate(t *testing.T) { func TestEndpointsDiscoveryUpdate(t *testing.T) {
n, _, eps, _ := makeTestEndpointsDiscovery() n, _, eps, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoints()) eps.GetStore().Add(makeEndpoints())

View file

@ -63,13 +63,28 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
} }
n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) { AddFunc: func(o interface{}) {
send(n.buildNode(o.(*apiv1.Node))) node, err := convertToNode(o)
if err != nil {
n.logger.With("err", err).Errorln("converting to Node object failed")
return
}
send(n.buildNode(node))
}, },
DeleteFunc: func(o interface{}) { DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: nodeSource(o.(*apiv1.Node))}) node, err := convertToNode(o)
if err != nil {
n.logger.With("err", err).Errorln("converting to Node object failed")
return
}
send(&config.TargetGroup{Source: nodeSource(node)})
}, },
UpdateFunc: func(_, o interface{}) { UpdateFunc: func(_, o interface{}) {
send(n.buildNode(o.(*apiv1.Node))) node, err := convertToNode(o)
if err != nil {
n.logger.With("err", err).Errorln("converting to Node object failed")
return
}
send(n.buildNode(node))
}, },
}) })
@ -77,6 +92,22 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
<-ctx.Done() <-ctx.Done()
} }
func convertToNode(o interface{}) (*apiv1.Node, error) {
node, isNode := o.(*apiv1.Node)
if !isNode {
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
node, ok = deletedState.Obj.(*apiv1.Node)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
}
}
return node, nil
}
func nodeSource(n *apiv1.Node) string { func nodeSource(n *apiv1.Node) string {
return "node/" + n.Name return "node/" + n.Name
} }

View file

@ -270,6 +270,36 @@ func TestNodeDiscoveryDelete(t *testing.T) {
}.Run(t) }.Run(t)
} }
func TestNodeDiscoveryDeleteUnknownCacheState(t *testing.T) {
n, i := makeTestNodeDiscovery()
i.GetStore().Add(makeEnumeratedNode(0))
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeEnumeratedNode(0)}) }() },
expectedInitial: []*config.TargetGroup{
&config.TargetGroup{
Targets: []model.LabelSet{
model.LabelSet{
"__address__": "1.2.3.4:10250",
"instance": "test0",
"__meta_kubernetes_node_address_InternalIP": "1.2.3.4",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_node_name": "test0",
},
Source: "node/test0",
},
},
expectedRes: []*config.TargetGroup{
&config.TargetGroup{
Source: "node/test0",
},
},
}.Run(t)
}
func TestNodeDiscoveryUpdate(t *testing.T) { func TestNodeDiscoveryUpdate(t *testing.T) {
n, i := makeTestNodeDiscovery() n, i := makeTestNodeDiscovery()
i.GetStore().Add(makeEnumeratedNode(0)) i.GetStore().Add(makeEnumeratedNode(0))

View file

@ -71,13 +71,28 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
} }
p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) { AddFunc: func(o interface{}) {
send(p.buildPod(o.(*apiv1.Pod))) pod, err := convertToPod(o)
if err != nil {
p.logger.With("err", err).Errorln("converting to Pod object failed")
return
}
send(p.buildPod(pod))
}, },
DeleteFunc: func(o interface{}) { DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: podSource(o.(*apiv1.Pod))}) pod, err := convertToPod(o)
if err != nil {
p.logger.With("err", err).Errorln("converting to Pod object failed")
return
}
send(&config.TargetGroup{Source: podSource(pod)})
}, },
UpdateFunc: func(_, o interface{}) { UpdateFunc: func(_, o interface{}) {
send(p.buildPod(o.(*apiv1.Pod))) pod, err := convertToPod(o)
if err != nil {
p.logger.With("err", err).Errorln("converting to Pod object failed")
return
}
send(p.buildPod(pod))
}, },
}) })
@ -85,6 +100,22 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
<-ctx.Done() <-ctx.Done()
} }
func convertToPod(o interface{}) (*apiv1.Pod, error) {
pod, isPod := o.(*apiv1.Pod)
if !isPod {
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
pod, ok = deletedState.Obj.(*apiv1.Pod)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj)
}
}
return pod, nil
}
const ( const (
podNameLabel = metaLabelPrefix + "pod_name" podNameLabel = metaLabelPrefix + "pod_name"
podIPLabel = metaLabelPrefix + "pod_ip" podIPLabel = metaLabelPrefix + "pod_ip"

View file

@ -14,13 +14,13 @@
package kubernetes package kubernetes
import ( import (
//"fmt"
"testing" "testing"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"k8s.io/client-go/1.5/pkg/api/v1" "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache"
) )
func podStoreKeyFunc(obj interface{}) (string, error) { func podStoreKeyFunc(obj interface{}) (string, error) {
@ -226,6 +226,43 @@ func TestPodDiscoveryDelete(t *testing.T) {
}.Run(t) }.Run(t)
} }
func TestPodDiscoveryDeleteUnknownCacheState(t *testing.T) {
n, i := makeTestPodDiscovery()
i.GetStore().Add(makePod())
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makePod()}) }() },
expectedInitial: []*config.TargetGroup{
&config.TargetGroup{
Targets: []model.LabelSet{
model.LabelSet{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_ready": "true",
},
Source: "pod/default/testpod",
},
},
expectedRes: []*config.TargetGroup{
&config.TargetGroup{
Source: "pod/default/testpod",
},
},
}.Run(t)
}
func TestPodDiscoveryUpdate(t *testing.T) { func TestPodDiscoveryUpdate(t *testing.T) {
n, i := makeTestPodDiscovery() n, i := makeTestPodDiscovery()
i.GetStore().Add(&v1.Pod{ i.GetStore().Add(&v1.Pod{

View file

@ -14,6 +14,7 @@
package kubernetes package kubernetes
import ( import (
"fmt"
"net" "net"
"strconv" "strconv"
@ -61,13 +62,28 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
} }
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) { AddFunc: func(o interface{}) {
send(s.buildService(o.(*apiv1.Service))) svc, err := convertToService(o)
if err != nil {
s.logger.With("err", err).Errorln("converting to Service object failed")
return
}
send(s.buildService(svc))
}, },
DeleteFunc: func(o interface{}) { DeleteFunc: func(o interface{}) {
send(&config.TargetGroup{Source: serviceSource(o.(*apiv1.Service))}) svc, err := convertToService(o)
if err != nil {
s.logger.With("err", err).Errorln("converting to Service object failed")
return
}
send(&config.TargetGroup{Source: serviceSource(svc)})
}, },
UpdateFunc: func(_, o interface{}) { UpdateFunc: func(_, o interface{}) {
send(s.buildService(o.(*apiv1.Service))) svc, err := convertToService(o)
if err != nil {
s.logger.With("err", err).Errorln("converting to Service object failed")
return
}
send(s.buildService(svc))
}, },
}) })
@ -75,6 +91,22 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
<-ctx.Done() <-ctx.Done()
} }
func convertToService(o interface{}) (*apiv1.Service, error) {
service, isService := o.(*apiv1.Service)
if !isService {
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
service, ok = deletedState.Obj.(*apiv1.Service)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Service object: %v", deletedState.Obj)
}
}
return service, nil
}
func serviceSource(s *apiv1.Service) string { func serviceSource(s *apiv1.Service) string {
return "svc/" + s.Namespace + "/" + s.Name return "svc/" + s.Namespace + "/" + s.Name
} }

View file

@ -21,6 +21,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"k8s.io/client-go/1.5/pkg/api/v1" "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/tools/cache"
) )
func serviceStoreKeyFunc(obj interface{}) (string, error) { func serviceStoreKeyFunc(obj interface{}) (string, error) {
@ -171,6 +172,37 @@ func TestServiceDiscoveryDelete(t *testing.T) {
}.Run(t) }.Run(t)
} }
func TestServiceDiscoveryDeleteUnknownCacheState(t *testing.T) {
n, i := makeTestServiceDiscovery()
i.GetStore().Add(makeService())
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeService()}) }() },
expectedInitial: []*config.TargetGroup{
&config.TargetGroup{
Targets: []model.LabelSet{
model.LabelSet{
"__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_service_name": "testservice",
"__meta_kubernetes_namespace": "default",
},
Source: "svc/default/testservice",
},
},
expectedRes: []*config.TargetGroup{
&config.TargetGroup{
Source: "svc/default/testservice",
},
},
}.Run(t)
}
func TestServiceDiscoveryUpdate(t *testing.T) { func TestServiceDiscoveryUpdate(t *testing.T) {
n, i := makeTestServiceDiscovery() n, i := makeTestServiceDiscovery()
i.GetStore().Add(makeService()) i.GetStore().Add(makeService())