Refactor Kubernetes Discovery Part 2: Refactoring

- Do initial listing and syncing to scrape manager, then register event
  handlers may lost events happening in listing and syncing (if it
  lasted a long time). We should register event handlers at the very
  begining, before processing just wait until informers synced (sync in
  informer will list all objects and call OnUpdate event handler).
- Use a queue then we don't block event callbacks and an object will be
  processed only once if added multiple times before it being processed.
- Fix bug in `serviceUpdate` in endpoints.go, we should build endpoints
  when `exists && err == nil`. Add `^TestEndpointsDiscoveryWithService`
  tests to test this feature.

Testing:

- Use `k8s.io/client-go` testing framework and fake implementations which are
  more robust and reliable for testing.
- `Test\w+DiscoveryBeforeRun` are used to test objects created before
  discoverer runs
- `Test\w+DiscoveryAdd\w+` are used to test adding objects
- `Test\w+DiscoveryDelete\w+` are used to test deleting objects
- `Test\w+DiscoveryUpdate\w+` are used to test updating objects
- `TestEndpointsDiscoveryWithService\w+` are used to test endpoints
  events triggered by services
- `cache.DeletedFinalStateUnknown` related stuffs are removed, because
  we don't care deleted objects in store, we only need its name to send
  a specical `targetgroup.Group` to scrape manager

Signed-off-by: Yecheng Fu <cofyc.jackson@gmail.com>
This commit is contained in:
Yecheng Fu 2018-04-10 00:35:14 +08:00 committed by beorn7
parent 9bc6ced55d
commit 8ceb8f2ae8
12 changed files with 1045 additions and 986 deletions

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
apiv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
// Endpoints discovers new endpoint targets.
@ -38,6 +39,8 @@ type Endpoints struct {
podStore cache.Store
endpointsStore cache.Store
serviceStore cache.Store
queue *workqueue.Type
}
// NewEndpoints returns a new endpoints discovery.
@ -45,7 +48,7 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints {
if l == nil {
l = log.NewNopLogger()
}
ep := &Endpoints{
e := &Endpoints{
logger: l,
endpointsInf: eps,
endpointsStore: eps.GetStore(),
@ -53,67 +56,21 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints {
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
}
return ep
}
// Run implements the Discoverer interface.
func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// Send full initial set of endpoint targets.
var initial []*targetgroup.Group
for _, o := range e.endpointsStore.List() {
tg := e.buildEndpoints(o.(*apiv1.Endpoints))
initial = append(initial, tg)
}
select {
case <-ctx.Done():
return
case ch <- initial:
}
// Send target groups for pod updates.
send := func(tg *targetgroup.Group) {
if tg == nil {
return
}
level.Debug(e.logger).Log("msg", "endpoints update", "tg", fmt.Sprintf("%#v", tg))
select {
case <-ctx.Done():
case ch <- []*targetgroup.Group{tg}:
}
queue: workqueue.NewNamed("endpoints"),
}
e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("endpoints", "add").Inc()
eps, err := convertToEndpoints(o)
if err != nil {
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return
}
send(e.buildEndpoints(eps))
e.enqueue(o)
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("endpoints", "update").Inc()
eps, err := convertToEndpoints(o)
if err != nil {
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return
}
send(e.buildEndpoints(eps))
e.enqueue(o)
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("endpoints", "delete").Inc()
eps, err := convertToEndpoints(o)
if err != nil {
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return
}
send(&targetgroup.Group{Source: endpointsSource(eps)})
e.enqueue(o)
},
})
@ -128,9 +85,10 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
ep.Namespace = svc.Namespace
ep.Name = svc.Name
obj, exists, err := e.endpointsStore.Get(ep)
if exists && err != nil {
send(e.buildEndpoints(obj.(*apiv1.Endpoints)))
if exists && err == nil {
e.enqueue(obj.(*apiv1.Endpoints))
}
if err != nil {
level.Error(e.logger).Log("msg", "retrieving endpoints failed", "err", err)
}
@ -152,31 +110,102 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
},
})
return e
}
func (e *Endpoints) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
e.queue.Add(key)
}
// Run implements the Discoverer interface.
func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer e.queue.ShutDown()
cacheSyncs := []cache.InformerSynced{
e.endpointsInf.HasSynced,
e.serviceInf.HasSynced,
e.podInf.HasSynced,
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache")
return
}
// Send target groups for pod updates.
send := func(tg *targetgroup.Group) {
if tg == nil {
return
}
level.Debug(e.logger).Log("msg", "endpoints update", "tg", fmt.Sprintf("%#v", tg))
select {
case <-ctx.Done():
case ch <- []*targetgroup.Group{tg}:
}
}
go func() {
for e.process(send) {
}
}()
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
func (e *Endpoints) process(send func(tg *targetgroup.Group)) bool {
keyObj, quit := e.queue.Get()
if quit {
return false
}
defer e.queue.Done(keyObj)
key := keyObj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
level.Error(e.logger).Log("msg", "spliting key failed", "key", key)
return true
}
o, exists, err := e.endpointsStore.GetByKey(key)
if err != nil {
level.Error(e.logger).Log("msg", "getting object from store failed", "key", key)
return true
}
if !exists {
send(&targetgroup.Group{Source: endpointsSourceFromNamespaceAndName(namespace, name)})
return true
}
eps, err := convertToEndpoints(o)
if err != nil {
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return true
}
send(e.buildEndpoints(eps))
return true
}
func convertToEndpoints(o interface{}) (*apiv1.Endpoints, error) {
endpoints, ok := o.(*apiv1.Endpoints)
if ok {
return endpoints, nil
}
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
endpoints, ok = deletedState.Obj.(*apiv1.Endpoints)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj)
}
return endpoints, nil
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
func endpointsSource(ep *apiv1.Endpoints) string {
return "endpoints/" + ep.ObjectMeta.Namespace + "/" + ep.ObjectMeta.Name
}
func endpointsSourceFromNamespaceAndName(namespace, name string) string {
return "endpoints/" + namespace + "/" + name
}
const (
endpointsNameLabel = metaLabelPrefix + "endpoints_name"
endpointReadyLabel = metaLabelPrefix + "endpoint_ready"

View file

@ -21,24 +21,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
)
func endpointsStoreKeyFunc(obj interface{}) (string, error) {
return obj.(*v1.Endpoints).ObjectMeta.Name, nil
}
func newFakeEndpointsInformer() *fakeInformer {
return newFakeInformer(endpointsStoreKeyFunc)
}
func makeTestEndpointsDiscovery() (*Endpoints, *fakeInformer, *fakeInformer, *fakeInformer) {
svc := newFakeServiceInformer()
eps := newFakeEndpointsInformer()
pod := newFakePodInformer()
return NewEndpoints(nil, svc, eps, pod), svc, eps, pod
}
func makeEndpoints() *v1.Endpoints {
return &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
@ -83,14 +67,19 @@ func makeEndpoints() *v1.Endpoints {
}
}
func TestEndpointsDiscoveryInitial(t *testing.T) {
n, _, eps, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoints())
func TestEndpointsDiscoveryBeforeRun(t *testing.T) {
n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{})
k8sDiscoveryTest{
discovery: n,
expectedInitial: []*targetgroup.Group{
{
beforeRun: func() {
obj := makeEndpoints()
c.CoreV1().Endpoints(obj.Namespace).Create(obj)
w.Endpoints().Add(obj)
},
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
@ -122,8 +111,7 @@ func TestEndpointsDiscoveryInitial(t *testing.T) {
}
func TestEndpointsDiscoveryAdd(t *testing.T) {
n, _, eps, pods := makeTestEndpointsDiscovery()
pods.GetStore().Add(&v1.Pod{
obj := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
Namespace: "default",
@ -158,45 +146,45 @@ func TestEndpointsDiscoveryAdd(t *testing.T) {
HostIP: "2.3.4.5",
PodIP: "1.2.3.4",
},
})
}
n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, obj)
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
go func() {
eps.Add(
&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
},
Subsets: []v1.EndpointSubset{
obj := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
},
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
Addresses: []v1.EndpointAddress{
{
IP: "4.3.2.1",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "testpod",
Namespace: "default",
},
},
},
Ports: []v1.EndpointPort{
{
Name: "testport",
Port: 9000,
Protocol: v1.ProtocolTCP,
},
IP: "4.3.2.1",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "testpod",
Namespace: "default",
},
},
},
Ports: []v1.EndpointPort{
{
Name: "testport",
Port: 9000,
Protocol: v1.ProtocolTCP,
},
},
},
)
}()
},
}
c.CoreV1().Endpoints(obj.Namespace).Create(obj)
w.Endpoints().Add(obj)
},
expectedRes: []*targetgroup.Group{
{
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "4.3.2.1:9000",
@ -239,29 +227,18 @@ func TestEndpointsDiscoveryAdd(t *testing.T) {
}
func TestEndpointsDiscoveryDelete(t *testing.T) {
n, _, eps, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoints())
n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { eps.Delete(makeEndpoints()) }() },
expectedRes: []*targetgroup.Group{
{
Source: "endpoints/default/testendpoints",
},
discovery: n,
afterStart: func() {
obj := makeEndpoints()
c.CoreV1().Endpoints(obj.Namespace).Delete(obj.Name, &metav1.DeleteOptions{})
w.Endpoints().Delete(obj)
},
}.Run(t)
}
func TestEndpointsDiscoveryDeleteUnknownCacheState(t *testing.T) {
n, _, eps, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoints())
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { eps.Delete(cache.DeletedFinalStateUnknown{Obj: makeEndpoints()}) }() },
expectedRes: []*targetgroup.Group{
{
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Source: "endpoints/default/testendpoints",
},
},
@ -269,53 +246,53 @@ func TestEndpointsDiscoveryDeleteUnknownCacheState(t *testing.T) {
}
func TestEndpointsDiscoveryUpdate(t *testing.T) {
n, _, eps, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoints())
n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
go func() {
eps.Update(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
},
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "1.2.3.4",
},
},
Ports: []v1.EndpointPort{
{
Name: "testport",
Port: 9000,
Protocol: v1.ProtocolTCP,
},
obj := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
},
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "1.2.3.4",
},
},
{
Addresses: []v1.EndpointAddress{
{
IP: "2.3.4.5",
},
},
Ports: []v1.EndpointPort{
{
Name: "testport",
Port: 9001,
Protocol: v1.ProtocolTCP,
},
Ports: []v1.EndpointPort{
{
Name: "testport",
Port: 9000,
Protocol: v1.ProtocolTCP,
},
},
},
})
}()
{
Addresses: []v1.EndpointAddress{
{
IP: "2.3.4.5",
},
},
Ports: []v1.EndpointPort{
{
Name: "testport",
Port: 9001,
Protocol: v1.ProtocolTCP,
},
},
},
},
}
c.CoreV1().Endpoints(obj.Namespace).Update(obj)
w.Endpoints().Modify(obj)
},
expectedRes: []*targetgroup.Group{
{
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
@ -341,24 +318,24 @@ func TestEndpointsDiscoveryUpdate(t *testing.T) {
}
func TestEndpointsDiscoveryEmptySubsets(t *testing.T) {
n, _, eps, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoints())
n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
go func() {
eps.Update(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
},
Subsets: []v1.EndpointSubset{},
})
}()
obj := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
},
Subsets: []v1.EndpointSubset{},
}
c.CoreV1().Endpoints(obj.Namespace).Update(obj)
w.Endpoints().Modify(obj)
},
expectedRes: []*targetgroup.Group{
{
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpoints_name": "testendpoints",
@ -368,3 +345,124 @@ func TestEndpointsDiscoveryEmptySubsets(t *testing.T) {
},
}.Run(t)
}
func TestEndpointsDiscoveryWithService(t *testing.T) {
n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
k8sDiscoveryTest{
discovery: n,
beforeRun: func() {
obj := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app": "test",
},
},
}
c.CoreV1().Services(obj.Namespace).Create(obj)
w.Services().Add(obj)
},
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_service_label_app": "test",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpoints/default/testendpoints",
},
},
}.Run(t)
}
func TestEndpointsDiscoveryWithServiceUpdate(t *testing.T) {
n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
k8sDiscoveryTest{
discovery: n,
beforeRun: func() {
obj := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app": "test",
},
},
}
c.CoreV1().Services(obj.Namespace).Create(obj)
w.Services().Add(obj)
},
afterStart: func() {
obj := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app": "svc",
"component": "testing",
},
},
}
c.CoreV1().Services(obj.Namespace).Update(obj)
w.Services().Modify(obj)
},
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_service_label_app": "svc",
"__meta_kubernetes_service_name": "testendpoints",
"__meta_kubernetes_service_label_component": "testing",
},
Source: "endpoints/default/testendpoints",
},
},
}.Run(t)
}

View file

@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/util/strutil"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
// Ingress implements discovery of Kubernetes ingresss.
@ -31,25 +32,45 @@ type Ingress struct {
logger log.Logger
informer cache.SharedInformer
store cache.Store
queue *workqueue.Type
}
// NewIngress returns a new ingress discovery.
func NewIngress(l log.Logger, inf cache.SharedInformer) *Ingress {
return &Ingress{logger: l, informer: inf, store: inf.GetStore()}
s := &Ingress{logger: l, informer: inf, store: inf.GetStore(), queue: workqueue.NewNamed("ingress")}
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("ingress", "add").Inc()
s.enqueue(o)
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("ingress", "delete").Inc()
s.enqueue(o)
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("ingress", "update").Inc()
s.enqueue(o)
},
})
return s
}
func (e *Ingress) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
e.queue.Add(key)
}
// Run implements the Discoverer interface.
func (s *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// Send full initial set of pod targets.
var initial []*targetgroup.Group
for _, o := range s.store.List() {
tg := s.buildIngress(o.(*v1beta1.Ingress))
initial = append(initial, tg)
}
select {
case <-ctx.Done():
defer s.queue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) {
level.Error(s.logger).Log("msg", "ingress informer unable to sync cache")
return
case ch <- initial:
}
// Send target groups for ingress updates.
@ -59,64 +80,64 @@ func (s *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
case ch <- []*targetgroup.Group{tg}:
}
}
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("ingress", "add").Inc()
ingress, err := convertToIngress(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error())
return
}
send(s.buildIngress(ingress))
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("ingress", "delete").Inc()
ingress, err := convertToIngress(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error())
return
}
send(&targetgroup.Group{Source: ingressSource(ingress)})
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("ingress", "update").Inc()
ingress, err := convertToIngress(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error())
return
}
send(s.buildIngress(ingress))
},
})
go func() {
for s.process(send) {
}
}()
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
func (s *Ingress) process(send func(tg *targetgroup.Group)) bool {
keyObj, quit := s.queue.Get()
if quit {
return false
}
defer s.queue.Done(keyObj)
key := keyObj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return true
}
o, exists, err := s.store.GetByKey(key)
if err != nil {
return true
}
if !exists {
send(&targetgroup.Group{Source: ingressSourceFromNamespaceAndName(namespace, name)})
return true
}
eps, err := convertToIngress(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err)
return true
}
send(s.buildIngress(eps))
return true
}
func convertToIngress(o interface{}) (*v1beta1.Ingress, error) {
ingress, ok := o.(*v1beta1.Ingress)
if ok {
return ingress, nil
}
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
ingress, ok = deletedState.Obj.(*v1beta1.Ingress)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Ingress object: %v", deletedState.Obj)
}
return ingress, nil
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
func ingressSource(s *v1beta1.Ingress) string {
return "ingress/" + s.Namespace + "/" + s.Name
}
func ingressSourceFromNamespaceAndName(namespace, name string) string {
return "ingress/" + namespace + "/" + name
}
const (
ingressNameLabel = metaLabelPrefix + "ingress_name"
ingressLabelPrefix = metaLabelPrefix + "ingress_label_"

View file

@ -22,19 +22,6 @@ import (
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
)
func ingressStoreKeyFunc(obj interface{}) (string, error) {
return obj.(*v1beta1.Ingress).ObjectMeta.Name, nil
}
func newFakeIngressInformer() *fakeInformer {
return newFakeInformer(ingressStoreKeyFunc)
}
func makeTestIngressDiscovery() (*Ingress, *fakeInformer) {
i := newFakeIngressInformer()
return NewIngress(nil, i), i
}
func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress {
return &v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
@ -77,13 +64,13 @@ func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress {
}
}
func expectedTargetGroups(tls bool) []*targetgroup.Group {
func expectedTargetGroups(tls bool) map[string]*targetgroup.Group {
scheme := "http"
if tls {
scheme = "https"
}
return []*targetgroup.Group{
{
return map[string]*targetgroup.Group{
"ingress/default/testingress": {
Targets: []model.LabelSet{
{
"__meta_kubernetes_ingress_scheme": lv(scheme),
@ -115,22 +102,32 @@ func expectedTargetGroups(tls bool) []*targetgroup.Group {
}
}
func TestIngressDiscoveryInitial(t *testing.T) {
n, i := makeTestIngressDiscovery()
i.GetStore().Add(makeIngress(nil))
func TestIngressDiscoveryAdd(t *testing.T) {
n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"default"}})
k8sDiscoveryTest{
discovery: n,
expectedInitial: expectedTargetGroups(false),
discovery: n,
afterStart: func() {
obj := makeIngress(nil)
c.ExtensionsV1beta1().Ingresses("default").Create(obj)
w.Ingresses().Add(obj)
},
expectedMaxItems: 1,
expectedRes: expectedTargetGroups(false),
}.Run(t)
}
func TestIngressDiscoveryInitialTLS(t *testing.T) {
n, i := makeTestIngressDiscovery()
i.GetStore().Add(makeIngress([]v1beta1.IngressTLS{{}}))
func TestIngressDiscoveryAddTLS(t *testing.T) {
n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"default"}})
k8sDiscoveryTest{
discovery: n,
expectedInitial: expectedTargetGroups(true),
discovery: n,
afterStart: func() {
obj := makeIngress([]v1beta1.IngressTLS{{}})
c.ExtensionsV1beta1().Ingresses("default").Create(obj)
w.Ingresses().Add(obj)
},
expectedMaxItems: 1,
expectedRes: expectedTargetGroups(true),
}.Run(t)
}

View file

@ -28,6 +28,9 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
yaml_util "github.com/prometheus/prometheus/util/yaml"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
apiv1 "k8s.io/client-go/pkg/api/v1"
@ -152,13 +155,21 @@ func init() {
}
}
// Discovery implements the Discoverer interface for discovering
// Copy of discovery.Discoverer to avoid import cycle.
// This is only for internal use.
type discoverer interface {
Run(ctx context.Context, up chan<- []*targetgroup.Group)
}
// Discovery implements the discoverer interface for discovering
// targets from Kubernetes.
type Discovery struct {
sync.RWMutex
client kubernetes.Interface
role Role
logger log.Logger
namespaceDiscovery *NamespaceDiscovery
discoverers []discoverer
}
func (d *Discovery) getNamespaces() []string {
@ -239,129 +250,156 @@ func New(l log.Logger, conf *SDConfig) (*Discovery, error) {
logger: l,
role: conf.Role,
namespaceDiscovery: &conf.NamespaceDiscovery,
discoverers: make([]discoverer, 0),
}, nil
}
const resyncPeriod = 10 * time.Minute
// Run implements the Discoverer interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
rclient := d.client.Core().RESTClient()
reclient := d.client.Extensions().RESTClient()
type hasSynced interface {
// hasSynced returns true if all informers' store has synced.
// This is only used in testing to determine when the cache stores have synced.
hasSynced() bool
}
var _ hasSynced = &Discovery{}
func (d *Discovery) hasSynced() bool {
d.RLock()
defer d.RUnlock()
for _, discoverer := range d.discoverers {
if hasSynceddiscoverer, ok := discoverer.(hasSynced); ok {
if !hasSynceddiscoverer.hasSynced() {
return false
}
}
}
return true
}
// Run implements the discoverer interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
d.Lock()
namespaces := d.getNamespaces()
switch d.role {
case "endpoints":
var wg sync.WaitGroup
for _, namespace := range namespaces {
elw := cache.NewListWatchFromClient(rclient, "endpoints", namespace, nil)
slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil)
plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil)
elw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Endpoints(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Endpoints(namespace).Watch(options)
},
}
slw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Services(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Services(namespace).Watch(options)
},
}
plw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Pods(namespace).Watch(options)
},
}
eps := NewEndpoints(
log.With(d.logger, "role", "endpoint"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
d.discoverers = append(d.discoverers, eps)
go eps.endpointsInf.Run(ctx.Done())
go eps.serviceInf.Run(ctx.Done())
go eps.podInf.Run(ctx.Done())
for !eps.serviceInf.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
for !eps.endpointsInf.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
for !eps.podInf.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
go func() {
defer wg.Done()
eps.Run(ctx, ch)
}()
}
wg.Wait()
case "pod":
var wg sync.WaitGroup
for _, namespace := range namespaces {
plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil)
plw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Pods(namespace).Watch(options)
},
}
pod := NewPod(
log.With(d.logger, "role", "pod"),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
d.discoverers = append(d.discoverers, pod)
go pod.informer.Run(ctx.Done())
for !pod.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
go func() {
defer wg.Done()
pod.Run(ctx, ch)
}()
}
wg.Wait()
case "service":
var wg sync.WaitGroup
for _, namespace := range namespaces {
slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil)
slw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Services(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Services(namespace).Watch(options)
},
}
svc := NewService(
log.With(d.logger, "role", "service"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
)
d.discoverers = append(d.discoverers, svc)
go svc.informer.Run(ctx.Done())
for !svc.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
go func() {
defer wg.Done()
svc.Run(ctx, ch)
}()
}
wg.Wait()
case "ingress":
var wg sync.WaitGroup
for _, namespace := range namespaces {
ilw := cache.NewListWatchFromClient(reclient, "ingresses", namespace, nil)
ilw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.ExtensionsV1beta1().Ingresses(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.ExtensionsV1beta1().Ingresses(namespace).Watch(options)
},
}
ingress := NewIngress(
log.With(d.logger, "role", "ingress"),
cache.NewSharedInformer(ilw, &extensionsv1beta1.Ingress{}, resyncPeriod),
)
d.discoverers = append(d.discoverers, ingress)
go ingress.informer.Run(ctx.Done())
for !ingress.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
go func() {
defer wg.Done()
ingress.Run(ctx, ch)
}()
}
wg.Wait()
case "node":
nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil)
nlw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Nodes().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Nodes().Watch(options)
},
}
node := NewNode(
log.With(d.logger, "role", "node"),
cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod),
)
d.discoverers = append(d.discoverers, node)
go node.informer.Run(ctx.Done())
for !node.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
node.Run(ctx, ch)
default:
level.Error(d.logger).Log("msg", "unknown Kubernetes discovery kind", "role", d.role)
}
var wg sync.WaitGroup
for _, dd := range d.discoverers {
wg.Add(1)
go func(d discoverer) {
defer wg.Done()
d.Run(ctx, ch)
}(dd)
}
d.Unlock()
<-ctx.Done()
}

View file

@ -0,0 +1,186 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"encoding/json"
"sync"
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
)
type watcherFactory struct {
sync.RWMutex
watchers map[schema.GroupVersionResource]*watch.FakeWatcher
}
func (wf *watcherFactory) watchFor(gvr schema.GroupVersionResource) *watch.FakeWatcher {
wf.Lock()
defer wf.Unlock()
var fakewatch *watch.FakeWatcher
fakewatch, ok := wf.watchers[gvr]
if !ok {
fakewatch = watch.NewFakeWithChanSize(128, true)
wf.watchers[gvr] = fakewatch
}
return fakewatch
}
func (wf *watcherFactory) Nodes() *watch.FakeWatcher {
return wf.watchFor(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"})
}
func (wf *watcherFactory) Ingresses() *watch.FakeWatcher {
return wf.watchFor(schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "ingresses"})
}
func (wf *watcherFactory) Endpoints() *watch.FakeWatcher {
return wf.watchFor(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "endpoints"})
}
func (wf *watcherFactory) Services() *watch.FakeWatcher {
return wf.watchFor(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"})
}
func (wf *watcherFactory) Pods() *watch.FakeWatcher {
return wf.watchFor(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"})
}
// makeDiscovery creates a kubernetes.Discovery instance for testing.
func makeDiscovery(role Role, nsDiscovery NamespaceDiscovery, objects ...runtime.Object) (*Discovery, kubernetes.Interface, *watcherFactory) {
clientset := fake.NewSimpleClientset(objects...)
// Current client-go we are using does not support push event on
// Add/Update/Create, so we need to emit event manually.
// See https://github.com/kubernetes/kubernetes/issues/54075.
// TODO update client-go thChanSizeand related packages to kubernetes-1.10.0+
wf := &watcherFactory{
watchers: make(map[schema.GroupVersionResource]*watch.FakeWatcher),
}
clientset.PrependWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
return true, wf.watchFor(gvr), nil
})
return &Discovery{
client: clientset,
logger: log.NewNopLogger(),
role: role,
namespaceDiscovery: &nsDiscovery,
}, clientset, wf
}
type k8sDiscoveryTest struct {
// discovery is instance of discovery.Discoverer
discovery discoverer
// beforeRun runs before discoverer run
beforeRun func()
// afterStart runs after discoverer has synced
afterStart func()
// expectedMaxItems is expected max items we may get from channel
expectedMaxItems int
// expectedRes is expected final result
expectedRes map[string]*targetgroup.Group
}
func (d k8sDiscoveryTest) Run(t *testing.T) {
ch := make(chan []*targetgroup.Group)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if d.beforeRun != nil {
d.beforeRun()
}
// Run discoverer and start a goroutine to read results.
go d.discovery.Run(ctx, ch)
resChan := make(chan map[string]*targetgroup.Group)
go readResultWithoutTimeout(t, ch, d.expectedMaxItems, time.Second, resChan)
if dd, ok := d.discovery.(hasSynced); ok {
if !cache.WaitForCacheSync(ctx.Done(), dd.hasSynced) {
t.Errorf("discoverer failed to sync: %v", dd)
return
}
}
if d.afterStart != nil {
d.afterStart()
}
if d.expectedRes != nil {
res := <-resChan
requireTargetGroups(t, d.expectedRes, res)
}
}
// readResultWithoutTimeout reads all targegroups from channel with timeout.
// It merges targegroups by source and sends the result to result channel.
func readResultWithoutTimeout(t *testing.T, ch <-chan []*targetgroup.Group, max int, timeout time.Duration, resChan chan<- map[string]*targetgroup.Group) {
allTgs := make([][]*targetgroup.Group, 0)
Loop:
for {
select {
case tgs := <-ch:
allTgs = append(allTgs, tgs)
if len(allTgs) == max {
// Reached max target groups we may get, break fast.
break Loop
}
case <-time.After(timeout):
// Because we use queue, an object that is created then
// deleted or updated may be processed only once.
// So possibliy we may skip events, timed out here.
t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(allTgs), max)
break Loop
}
}
// Merge by source and sent it to channel.
res := make(map[string]*targetgroup.Group)
for _, tgs := range allTgs {
for _, tg := range tgs {
if tg == nil {
continue
}
res[tg.Source] = tg
}
}
resChan <- res
}
func requireTargetGroups(t *testing.T, expected, res map[string]*targetgroup.Group) {
b1, err := json.Marshal(expected)
if err != nil {
panic(err)
}
b2, err := json.Marshal(res)
if err != nil {
panic(err)
}
require.JSONEq(t, string(b1), string(b2))
}

View file

@ -27,6 +27,7 @@ import (
"k8s.io/client-go/pkg/api"
apiv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
// Node discovers Kubernetes nodes.
@ -34,28 +35,55 @@ type Node struct {
logger log.Logger
informer cache.SharedInformer
store cache.Store
queue *workqueue.Type
}
var _ discoverer = &Node{}
var _ hasSynced = &Node{}
// NewNode returns a new node discovery.
func NewNode(l log.Logger, inf cache.SharedInformer) *Node {
if l == nil {
l = log.NewNopLogger()
}
return &Node{logger: l, informer: inf, store: inf.GetStore()}
n := &Node{logger: l, informer: inf, store: inf.GetStore(), queue: workqueue.NewNamed("node")}
n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("node", "add").Inc()
n.enqueue(o)
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("node", "delete").Inc()
n.enqueue(o)
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("node", "update").Inc()
n.enqueue(o)
},
})
return n
}
func (e *Node) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
e.queue.Add(key)
}
func (n *Node) hasSynced() bool {
return n.informer.HasSynced()
}
// Run implements the Discoverer interface.
func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// Send full initial set of pod targets.
var initial []*targetgroup.Group
for _, o := range n.store.List() {
tg := n.buildNode(o.(*apiv1.Node))
initial = append(initial, tg)
}
select {
case <-ctx.Done():
defer n.queue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), n.informer.HasSynced) {
level.Error(n.logger).Log("msg", "node informer unable to sync cache")
return
case ch <- initial:
}
// Send target groups for service updates.
@ -68,64 +96,63 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
case ch <- []*targetgroup.Group{tg}:
}
}
n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("node", "add").Inc()
node, err := convertToNode(o)
if err != nil {
level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err)
return
}
send(n.buildNode(node))
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("node", "delete").Inc()
node, err := convertToNode(o)
if err != nil {
level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err)
return
}
send(&targetgroup.Group{Source: nodeSource(node)})
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("node", "update").Inc()
node, err := convertToNode(o)
if err != nil {
level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err)
return
}
send(n.buildNode(node))
},
})
go func() {
for n.process(send) {
}
}()
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
func (n *Node) process(send func(tg *targetgroup.Group)) bool {
keyObj, quit := n.queue.Get()
if quit {
return false
}
defer n.queue.Done(keyObj)
key := keyObj.(string)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return true
}
o, exists, err := n.store.GetByKey(key)
if err != nil {
return true
}
if !exists {
send(&targetgroup.Group{Source: nodeSourceFromName(name)})
return true
}
node, err := convertToNode(o)
if err != nil {
level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err)
return true
}
send(n.buildNode(node))
return true
}
func convertToNode(o interface{}) (*apiv1.Node, error) {
node, ok := o.(*apiv1.Node)
if ok {
return node, nil
}
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
node, ok = deletedState.Obj.(*apiv1.Node)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
}
return node, nil
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
func nodeSource(n *apiv1.Node) string {
return "node/" + n.Name
}
func nodeSourceFromName(name string) string {
return "node/" + name
}
const (
nodeNameLabel = metaLabelPrefix + "node_name"
nodeLabelPrefix = metaLabelPrefix + "node_label_"

View file

@ -14,152 +14,15 @@
package kubernetes
import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
)
type fakeInformer struct {
store cache.Store
handlers []cache.ResourceEventHandler
blockDeltas sync.Mutex
}
func newFakeInformer(f func(obj interface{}) (string, error)) *fakeInformer {
i := &fakeInformer{
store: cache.NewStore(f),
}
// We want to make sure that all delta events (Add/Update/Delete) are blocked
// until our handlers to test have been added.
i.blockDeltas.Lock()
return i
}
func (i *fakeInformer) AddEventHandler(h cache.ResourceEventHandler) {
i.handlers = append(i.handlers, h)
// Only now that there is a registered handler, we are able to handle deltas.
i.blockDeltas.Unlock()
}
func (i *fakeInformer) AddEventHandlerWithResyncPeriod(h cache.ResourceEventHandler, _ time.Duration) {
i.AddEventHandler(h)
}
func (i *fakeInformer) GetStore() cache.Store {
return i.store
}
func (i *fakeInformer) GetController() cache.Controller {
return nil
}
func (i *fakeInformer) Run(stopCh <-chan struct{}) {
}
func (i *fakeInformer) HasSynced() bool {
return true
}
func (i *fakeInformer) LastSyncResourceVersion() string {
return "0"
}
func (i *fakeInformer) Add(obj interface{}) {
i.blockDeltas.Lock()
defer i.blockDeltas.Unlock()
for _, h := range i.handlers {
h.OnAdd(obj)
}
}
func (i *fakeInformer) Delete(obj interface{}) {
i.blockDeltas.Lock()
defer i.blockDeltas.Unlock()
for _, h := range i.handlers {
h.OnDelete(obj)
}
}
func (i *fakeInformer) Update(obj interface{}) {
i.blockDeltas.Lock()
defer i.blockDeltas.Unlock()
for _, h := range i.handlers {
h.OnUpdate(nil, obj)
}
}
type discoverer interface {
Run(ctx context.Context, up chan<- []*targetgroup.Group)
}
type k8sDiscoveryTest struct {
discovery discoverer
afterStart func()
expectedInitial []*targetgroup.Group
expectedRes []*targetgroup.Group
}
func (d k8sDiscoveryTest) Run(t *testing.T) {
ch := make(chan []*targetgroup.Group)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
go func() {
d.discovery.Run(ctx, ch)
}()
initialRes := <-ch
if d.expectedInitial != nil {
requireTargetGroups(t, d.expectedInitial, initialRes)
}
if d.afterStart != nil && d.expectedRes != nil {
d.afterStart()
res := <-ch
requireTargetGroups(t, d.expectedRes, res)
}
}
func requireTargetGroups(t *testing.T, expected, res []*targetgroup.Group) {
b1, err := json.Marshal(expected)
if err != nil {
panic(err)
}
b2, err := json.Marshal(res)
if err != nil {
panic(err)
}
require.JSONEq(t, string(b1), string(b2))
}
func nodeStoreKeyFunc(obj interface{}) (string, error) {
return obj.(*v1.Node).ObjectMeta.Name, nil
}
func newFakeNodeInformer() *fakeInformer {
return newFakeInformer(nodeStoreKeyFunc)
}
func makeTestNodeDiscovery() (*Node, *fakeInformer) {
i := newFakeNodeInformer()
return NewNode(nil, i), i
}
func makeNode(name, address string, labels map[string]string, annotations map[string]string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@ -187,19 +50,24 @@ func makeEnumeratedNode(i int) *v1.Node {
return makeNode(fmt.Sprintf("test%d", i), "1.2.3.4", map[string]string{}, map[string]string{})
}
func TestNodeDiscoveryInitial(t *testing.T) {
n, i := makeTestNodeDiscovery()
i.GetStore().Add(makeNode(
"test",
"1.2.3.4",
map[string]string{"testlabel": "testvalue"},
map[string]string{"testannotation": "testannotationvalue"},
))
func TestNodeDiscoveryBeforeStart(t *testing.T) {
n, c, w := makeDiscovery(RoleNode, NamespaceDiscovery{})
k8sDiscoveryTest{
discovery: n,
expectedInitial: []*targetgroup.Group{
{
beforeRun: func() {
obj := makeNode(
"test",
"1.2.3.4",
map[string]string{"testlabel": "testvalue"},
map[string]string{"testannotation": "testannotationvalue"},
)
c.CoreV1().Nodes().Create(obj)
w.Nodes().Add(obj)
},
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"node/test": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:10250",
@ -219,13 +87,18 @@ func TestNodeDiscoveryInitial(t *testing.T) {
}
func TestNodeDiscoveryAdd(t *testing.T) {
n, i := makeTestNodeDiscovery()
n, c, w := makeDiscovery(RoleNode, NamespaceDiscovery{})
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Add(makeEnumeratedNode(1)) }() },
expectedRes: []*targetgroup.Group{
{
discovery: n,
afterStart: func() {
obj := makeEnumeratedNode(1)
c.CoreV1().Nodes().Create(obj)
w.Nodes().Add(obj)
},
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"node/test1": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:10250",
@ -243,59 +116,18 @@ func TestNodeDiscoveryAdd(t *testing.T) {
}
func TestNodeDiscoveryDelete(t *testing.T) {
n, i := makeTestNodeDiscovery()
i.GetStore().Add(makeEnumeratedNode(0))
obj := makeEnumeratedNode(0)
n, c, w := makeDiscovery(RoleNode, NamespaceDiscovery{}, obj)
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Delete(makeEnumeratedNode(0)) }() },
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:10250",
"instance": "test0",
"__meta_kubernetes_node_address_InternalIP": "1.2.3.4",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_node_name": "test0",
},
Source: "node/test0",
},
discovery: n,
afterStart: func() {
c.CoreV1().Nodes().Delete(obj.Name, &metav1.DeleteOptions{})
w.Nodes().Delete(obj)
},
expectedRes: []*targetgroup.Group{
{
Source: "node/test0",
},
},
}.Run(t)
}
func TestNodeDiscoveryDeleteUnknownCacheState(t *testing.T) {
n, i := makeTestNodeDiscovery()
i.GetStore().Add(makeEnumeratedNode(0))
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeEnumeratedNode(0)}) }() },
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:10250",
"instance": "test0",
"__meta_kubernetes_node_address_InternalIP": "1.2.3.4",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_node_name": "test0",
},
Source: "node/test0",
},
},
expectedRes: []*targetgroup.Group{
{
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"node/test0": {
Source: "node/test0",
},
},
@ -303,40 +135,26 @@ func TestNodeDiscoveryDeleteUnknownCacheState(t *testing.T) {
}
func TestNodeDiscoveryUpdate(t *testing.T) {
n, i := makeTestNodeDiscovery()
i.GetStore().Add(makeEnumeratedNode(0))
n, c, w := makeDiscovery(RoleNode, NamespaceDiscovery{})
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
go func() {
i.Update(
makeNode(
"test0",
"1.2.3.4",
map[string]string{"Unschedulable": "true"},
map[string]string{},
),
)
}()
obj1 := makeEnumeratedNode(0)
c.CoreV1().Nodes().Create(obj1)
w.Nodes().Add(obj1)
obj2 := makeNode(
"test0",
"1.2.3.4",
map[string]string{"Unschedulable": "true"},
map[string]string{},
)
c.CoreV1().Nodes().Update(obj2)
w.Nodes().Modify(obj2)
},
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:10250",
"instance": "test0",
"__meta_kubernetes_node_address_InternalIP": "1.2.3.4",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_node_name": "test0",
},
Source: "node/test0",
},
},
expectedRes: []*targetgroup.Group{
{
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"node/test0": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:10250",

View file

@ -26,6 +26,7 @@ import (
"k8s.io/client-go/pkg/api"
apiv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil"
@ -36,6 +37,7 @@ type Pod struct {
informer cache.SharedInformer
store cache.Store
logger log.Logger
queue *workqueue.Type
}
// NewPod creates a new pod discovery.
@ -43,27 +45,45 @@ func NewPod(l log.Logger, pods cache.SharedInformer) *Pod {
if l == nil {
l = log.NewNopLogger()
}
return &Pod{
p := &Pod{
informer: pods,
store: pods.GetStore(),
logger: l,
queue: workqueue.NewNamed("pod"),
}
p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("pod", "add").Inc()
p.enqueue(o)
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("pod", "delete").Inc()
p.enqueue(o)
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("pod", "update").Inc()
p.enqueue(o)
},
})
return p
}
func (e *Pod) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
e.queue.Add(key)
}
// Run implements the Discoverer interface.
func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// Send full initial set of pod targets.
var initial []*targetgroup.Group
for _, o := range p.store.List() {
tg := p.buildPod(o.(*apiv1.Pod))
initial = append(initial, tg)
defer p.queue.ShutDown()
level.Debug(p.logger).Log("msg", "initial pod", "tg", fmt.Sprintf("%#v", tg))
}
select {
case <-ctx.Done():
if !cache.WaitForCacheSync(ctx.Done(), p.informer.HasSynced) {
level.Error(p.logger).Log("msg", "pod informer unable to sync cache")
return
case ch <- initial:
}
// Send target groups for pod updates.
@ -77,58 +97,53 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
case ch <- []*targetgroup.Group{tg}:
}
}
p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("pod", "add").Inc()
pod, err := convertToPod(o)
if err != nil {
level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err)
return
}
send(p.buildPod(pod))
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("pod", "delete").Inc()
pod, err := convertToPod(o)
if err != nil {
level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err)
return
}
send(&targetgroup.Group{Source: podSource(pod)})
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("pod", "update").Inc()
pod, err := convertToPod(o)
if err != nil {
level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err)
return
}
send(p.buildPod(pod))
},
})
go func() {
for p.process(send) {
}
}()
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
func (p *Pod) process(send func(tg *targetgroup.Group)) bool {
keyObj, quit := p.queue.Get()
if quit {
return false
}
defer p.queue.Done(keyObj)
key := keyObj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return true
}
o, exists, err := p.store.GetByKey(key)
if err != nil {
return true
}
if !exists {
send(&targetgroup.Group{Source: podSourceFromNamespaceAndName(namespace, name)})
return true
}
eps, err := convertToPod(o)
if err != nil {
level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err)
return true
}
send(p.buildPod(eps))
return true
}
func convertToPod(o interface{}) (*apiv1.Pod, error) {
pod, ok := o.(*apiv1.Pod)
if ok {
return pod, nil
}
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
pod, ok = deletedState.Obj.(*apiv1.Pod)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj)
}
return pod, nil
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
const (
@ -215,6 +230,10 @@ func podSource(pod *apiv1.Pod) string {
return "pod/" + pod.Namespace + "/" + pod.Name
}
func podSourceFromNamespaceAndName(namespace, name string) string {
return "pod/" + namespace + "/" + name
}
func podReady(pod *apiv1.Pod) model.LabelValue {
for _, cond := range pod.Status.Conditions {
if cond.Type == apiv1.PodReady {

View file

@ -21,23 +21,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
)
func podStoreKeyFunc(obj interface{}) (string, error) {
return obj.(*v1.Pod).ObjectMeta.Name, nil
}
func newFakePodInformer() *fakeInformer {
return newFakeInformer(podStoreKeyFunc)
}
func makeTestPodDiscovery() (*Pod, *fakeInformer) {
i := newFakePodInformer()
return NewPod(nil, i), i
}
func makeMultiPortPod() *v1.Pod {
func makeMultiPortPods() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
@ -82,7 +68,7 @@ func makeMultiPortPod() *v1.Pod {
}
}
func makePod() *v1.Pod {
func makePods() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
@ -117,14 +103,19 @@ func makePod() *v1.Pod {
}
}
func TestPodDiscoveryInitial(t *testing.T) {
n, i := makeTestPodDiscovery()
i.GetStore().Add(makeMultiPortPod())
func TestPodDiscoveryBeforeRun(t *testing.T) {
n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{})
k8sDiscoveryTest{
discovery: n,
expectedInitial: []*targetgroup.Group{
{
beforeRun: func() {
obj := makeMultiPortPods()
c.CoreV1().Pods(obj.Namespace).Create(obj)
w.Pods().Add(obj)
},
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"pod/default/testpod": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
@ -163,13 +154,17 @@ func TestPodDiscoveryInitial(t *testing.T) {
}
func TestPodDiscoveryAdd(t *testing.T) {
n, i := makeTestPodDiscovery()
n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{})
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Add(makePod()) }() },
expectedRes: []*targetgroup.Group{
{
discovery: n,
afterStart: func() {
obj := makePods()
c.CoreV1().Pods(obj.Namespace).Create(obj)
w.Pods().Add(obj)
},
expectedRes: map[string]*targetgroup.Group{
"pod/default/testpod": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
@ -195,75 +190,18 @@ func TestPodDiscoveryAdd(t *testing.T) {
}
func TestPodDiscoveryDelete(t *testing.T) {
n, i := makeTestPodDiscovery()
i.GetStore().Add(makePod())
obj := makePods()
n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{}, obj)
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Delete(makePod()) }() },
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_uid": "abc123",
},
Source: "pod/default/testpod",
},
discovery: n,
afterStart: func() {
obj := makePods()
c.CoreV1().Pods(obj.Namespace).Delete(obj.Name, &metav1.DeleteOptions{})
w.Pods().Delete(obj)
},
expectedRes: []*targetgroup.Group{
{
Source: "pod/default/testpod",
},
},
}.Run(t)
}
func TestPodDiscoveryDeleteUnknownCacheState(t *testing.T) {
n, i := makeTestPodDiscovery()
i.GetStore().Add(makePod())
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makePod()}) }() },
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_uid": "abc123",
},
Source: "pod/default/testpod",
},
},
expectedRes: []*targetgroup.Group{
{
expectedRes: map[string]*targetgroup.Group{
"pod/default/testpod": {
Source: "pod/default/testpod",
},
},
@ -271,8 +209,7 @@ func TestPodDiscoveryDeleteUnknownCacheState(t *testing.T) {
}
func TestPodDiscoveryUpdate(t *testing.T) {
n, i := makeTestPodDiscovery()
i.GetStore().Add(&v1.Pod{
obj := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
Namespace: "default",
@ -297,36 +234,18 @@ func TestPodDiscoveryUpdate(t *testing.T) {
PodIP: "1.2.3.4",
HostIP: "2.3.4.5",
},
})
}
n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{}, obj)
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Update(makePod()) }() },
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_ready": "unknown",
"__meta_kubernetes_pod_uid": "xyz321",
},
Source: "pod/default/testpod",
},
discovery: n,
afterStart: func() {
obj := makePods()
c.CoreV1().Pods(obj.Namespace).Create(obj)
w.Pods().Modify(obj)
},
expectedRes: []*targetgroup.Group{
{
expectedRes: map[string]*targetgroup.Group{
"pod/default/testpod": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
@ -352,42 +271,25 @@ func TestPodDiscoveryUpdate(t *testing.T) {
}
func TestPodDiscoveryUpdateEmptyPodIP(t *testing.T) {
n, i := makeTestPodDiscovery()
initialPod := makePod()
n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{})
initialPod := makePods()
updatedPod := makePod()
updatedPod := makePods()
updatedPod.Status.PodIP = ""
i.GetStore().Add(initialPod)
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Update(updatedPod) }() },
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_uid": "abc123",
},
Source: "pod/default/testpod",
},
discovery: n,
beforeRun: func() {
c.CoreV1().Pods(initialPod.Namespace).Create(initialPod)
w.Pods().Add(initialPod)
},
expectedRes: []*targetgroup.Group{
{
afterStart: func() {
c.CoreV1().Pods(updatedPod.Namespace).Create(updatedPod)
w.Pods().Modify(updatedPod)
},
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"pod/default/testpod": {
Source: "pod/default/testpod",
},
},

View file

@ -24,6 +24,7 @@ import (
"github.com/prometheus/common/model"
apiv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil"
@ -34,6 +35,7 @@ type Service struct {
logger log.Logger
informer cache.SharedInformer
store cache.Store
queue *workqueue.Type
}
// NewService returns a new service discovery.
@ -41,21 +43,40 @@ func NewService(l log.Logger, inf cache.SharedInformer) *Service {
if l == nil {
l = log.NewNopLogger()
}
return &Service{logger: l, informer: inf, store: inf.GetStore()}
s := &Service{logger: l, informer: inf, store: inf.GetStore(), queue: workqueue.NewNamed("ingress")}
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("service", "add").Inc()
s.enqueue(o)
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("service", "delete").Inc()
s.enqueue(o)
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("service", "update").Inc()
s.enqueue(o)
},
})
return s
}
func (e *Service) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
e.queue.Add(key)
}
// Run implements the Discoverer interface.
func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// Send full initial set of pod targets.
var initial []*targetgroup.Group
for _, o := range s.store.List() {
tg := s.buildService(o.(*apiv1.Service))
initial = append(initial, tg)
}
select {
case <-ctx.Done():
defer s.queue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) {
level.Error(s.logger).Log("msg", "service informer unable to sync cache")
return
case ch <- initial:
}
// Send target groups for service updates.
@ -65,63 +86,62 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
case ch <- []*targetgroup.Group{tg}:
}
}
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("service", "add").Inc()
svc, err := convertToService(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err)
return
}
send(s.buildService(svc))
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("service", "delete").Inc()
svc, err := convertToService(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err)
return
}
send(&targetgroup.Group{Source: serviceSource(svc)})
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("service", "update").Inc()
svc, err := convertToService(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err)
return
}
send(s.buildService(svc))
},
})
go func() {
for s.process(send) {
}
}()
// Block until the target provider is explicitly canceled.
<-ctx.Done()
}
func (s *Service) process(send func(tg *targetgroup.Group)) bool {
keyObj, quit := s.queue.Get()
if quit {
return false
}
defer s.queue.Done(keyObj)
key := keyObj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return true
}
o, exists, err := s.store.GetByKey(key)
if err != nil {
return true
}
if !exists {
send(&targetgroup.Group{Source: serviceSourceFromNamespaceAndName(namespace, name)})
return true
}
eps, err := convertToService(o)
if err != nil {
level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err)
return true
}
send(s.buildService(eps))
return true
}
func convertToService(o interface{}) (*apiv1.Service, error) {
service, ok := o.(*apiv1.Service)
if ok {
return service, nil
}
deletedState, ok := o.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
service, ok = deletedState.Obj.(*apiv1.Service)
if !ok {
return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Service object: %v", deletedState.Obj)
}
return service, nil
return nil, fmt.Errorf("Received unexpected object: %v", o)
}
func serviceSource(s *apiv1.Service) string {
return "svc/" + s.Namespace + "/" + s.Name
}
func serviceSourceFromNamespaceAndName(namespace, name string) string {
return "svc/" + namespace + "/" + name
}
const (
serviceNameLabel = metaLabelPrefix + "service_name"
serviceLabelPrefix = metaLabelPrefix + "service_label_"

View file

@ -21,22 +21,8 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
)
func serviceStoreKeyFunc(obj interface{}) (string, error) {
return obj.(*v1.Service).ObjectMeta.Name, nil
}
func newFakeServiceInformer() *fakeInformer {
return newFakeInformer(serviceStoreKeyFunc)
}
func makeTestServiceDiscovery() (*Service, *fakeInformer) {
i := newFakeServiceInformer()
return NewService(nil, i), i
}
func makeMultiPortService() *v1.Service {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
@ -84,46 +70,19 @@ func makeService() *v1.Service {
return makeSuffixedService("")
}
func TestServiceDiscoveryInitial(t *testing.T) {
n, i := makeTestServiceDiscovery()
i.GetStore().Add(makeMultiPortService())
func TestServiceDiscoveryAdd(t *testing.T) {
n, c, w := makeDiscovery(RoleService, NamespaceDiscovery{})
k8sDiscoveryTest{
discovery: n,
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport0",
},
{
"__meta_kubernetes_service_port_protocol": "UDP",
"__address__": "testservice.default.svc:30901",
"__meta_kubernetes_service_port_name": "testport1",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_service_name": "testservice",
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_service_label_testlabel": "testvalue",
"__meta_kubernetes_service_annotation_testannotation": "testannotationvalue",
},
Source: "svc/default/testservice",
},
afterStart: func() {
obj := makeService()
c.CoreV1().Services(obj.Namespace).Create(obj)
w.Services().Add(obj)
},
}.Run(t)
}
func TestServiceDiscoveryAdd(t *testing.T) {
n, i := makeTestServiceDiscovery()
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Add(makeService()) }() },
expectedRes: []*targetgroup.Group{
{
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"svc/default/testservice": {
Targets: []model.LabelSet{
{
"__meta_kubernetes_service_port_protocol": "TCP",
@ -142,61 +101,18 @@ func TestServiceDiscoveryAdd(t *testing.T) {
}
func TestServiceDiscoveryDelete(t *testing.T) {
n, i := makeTestServiceDiscovery()
i.GetStore().Add(makeService())
n, c, w := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService())
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Delete(makeService()) }() },
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_service_name": "testservice",
"__meta_kubernetes_namespace": "default",
},
Source: "svc/default/testservice",
},
discovery: n,
afterStart: func() {
obj := makeService()
c.CoreV1().Services(obj.Namespace).Delete(obj.Name, &metav1.DeleteOptions{})
w.Services().Delete(obj)
},
expectedRes: []*targetgroup.Group{
{
Source: "svc/default/testservice",
},
},
}.Run(t)
}
func TestServiceDiscoveryDeleteUnknownCacheState(t *testing.T) {
n, i := makeTestServiceDiscovery()
i.GetStore().Add(makeService())
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeService()}) }() },
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_service_name": "testservice",
"__meta_kubernetes_namespace": "default",
},
Source: "svc/default/testservice",
},
},
expectedRes: []*targetgroup.Group{
{
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"svc/default/testservice": {
Source: "svc/default/testservice",
},
},
@ -204,30 +120,18 @@ func TestServiceDiscoveryDeleteUnknownCacheState(t *testing.T) {
}
func TestServiceDiscoveryUpdate(t *testing.T) {
n, i := makeTestServiceDiscovery()
i.GetStore().Add(makeService())
n, c, w := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService())
k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { i.Update(makeMultiPortService()) }() },
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_service_name": "testservice",
"__meta_kubernetes_namespace": "default",
},
Source: "svc/default/testservice",
},
discovery: n,
afterStart: func() {
obj := makeMultiPortService()
c.CoreV1().Services(obj.Namespace).Update(obj)
w.Services().Modify(obj)
},
expectedRes: []*targetgroup.Group{
{
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"svc/default/testservice": {
Targets: []model.LabelSet{
{
"__meta_kubernetes_service_port_protocol": "TCP",