Allow limiting Kubernetes service discover to certain namespaces

Allow namespace discovery to be more easily extended in the future by using a struct rather than just a list.

Rename fields for kubernetes namespace discovery
This commit is contained in:
Brian Akins 2017-04-19 08:36:34 -04:00
parent e499ef8cac
commit 27d66628a1
5 changed files with 154 additions and 54 deletions

View file

@ -987,12 +987,13 @@ func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error
// KubernetesSDConfig is the configuration for Kubernetes service discovery.
type KubernetesSDConfig struct {
APIServer URL `yaml:"api_server"`
Role KubernetesRole `yaml:"role"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
APIServer URL `yaml:"api_server"`
Role KubernetesRole `yaml:"role"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
NamespaceDiscovery KubernetesNamespaceDiscovery `yaml:"namespaces"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
@ -1026,6 +1027,28 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
return nil
}
// KubernetesNamespaceDiscovery is the configuration for discovering
// Kubernetes namespaces.
type KubernetesNamespaceDiscovery struct {
Names []string `yaml:"names"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *KubernetesNamespaceDiscovery) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = KubernetesNamespaceDiscovery{}
type plain KubernetesNamespaceDiscovery
err := unmarshal((*plain)(c))
if err != nil {
return err
}
if err := checkOverflow(c.XXX, "namespaces"); err != nil {
return err
}
return nil
}
// GCESDConfig is the configuration for GCE based service discovery.
type GCESDConfig struct {
// Project: The Google Cloud Project ID

View file

@ -305,6 +305,30 @@ var expectedConf = &Config{
Username: "myusername",
Password: "mypassword",
},
NamespaceDiscovery: KubernetesNamespaceDiscovery{},
},
},
},
},
{
JobName: "service-kubernetes-namespaces",
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme,
ServiceDiscoveryConfig: ServiceDiscoveryConfig{
KubernetesSDConfigs: []*KubernetesSDConfig{
{
APIServer: kubernetesSDHostURL(),
Role: KubernetesRoleEndpoint,
NamespaceDiscovery: KubernetesNamespaceDiscovery{
Names: []string{
"default",
},
},
},
},
},
@ -592,6 +616,9 @@ var expectedErrors = []struct {
}, {
filename: "kubernetes_role.bad.yml",
errMsg: "role",
}, {
filename: "kubernetes_namespace_discovery.bad.yml",
errMsg: "unknown fields in namespaces",
}, {
filename: "kubernetes_bearertoken_basicauth.bad.yml",
errMsg: "at most one of basic_auth, bearer_token & bearer_token_file must be configured",

View file

@ -146,6 +146,15 @@ scrape_configs:
username: 'myusername'
password: 'mypassword'
- job_name: service-kubernetes-namespaces
kubernetes_sd_configs:
- role: endpoints
api_server: 'https://localhost:1234'
namespaces:
names:
- default
- job_name: service-marathon
marathon_sd_configs:
- servers:

View file

@ -0,0 +1,6 @@
scrape_configs:
- kubernetes_sd_configs:
- api_server: kubernetes:443
role: endpoints
namespaces:
foo: bar

View file

@ -15,6 +15,7 @@ package kubernetes
import (
"io/ioutil"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -62,9 +63,10 @@ func init() {
// Discovery implements the TargetProvider interface for discovering
// targets from Kubernetes.
type Discovery struct {
client kubernetes.Interface
role config.KubernetesRole
logger log.Logger
client kubernetes.Interface
role config.KubernetesRole
logger log.Logger
namespaceDiscovery *config.KubernetesNamespaceDiscovery
}
func init() {
@ -75,6 +77,14 @@ func init() {
}
}
func (d *Discovery) getNamespaces() []string {
namespaces := d.namespaceDiscovery.Names
if len(namespaces) == 0 {
namespaces = []string{api.NamespaceAll}
}
return namespaces
}
// New creates a new Kubernetes discovery for the given role.
func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) {
var (
@ -137,9 +147,10 @@ func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) {
return nil, err
}
return &Discovery{
client: c,
logger: l,
role: conf.Role,
client: c,
logger: l,
role: conf.Role,
namespaceDiscovery: &conf.NamespaceDiscovery,
}, nil
}
@ -149,58 +160,82 @@ const resyncPeriod = 10 * time.Minute
func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
rclient := d.client.Core().GetRESTClient()
namespaces := d.getNamespaces()
switch d.role {
case "endpoints":
elw := cache.NewListWatchFromClient(rclient, "endpoints", api.NamespaceAll, nil)
slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil)
plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil)
eps := NewEndpoints(
d.logger.With("kubernetes_sd", "endpoint"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
go eps.endpointsInf.Run(ctx.Done())
go eps.serviceInf.Run(ctx.Done())
go eps.podInf.Run(ctx.Done())
var wg sync.WaitGroup
for !eps.serviceInf.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
for !eps.endpointsInf.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
for !eps.podInf.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
eps.Run(ctx, ch)
for _, namespace := range namespaces {
elw := cache.NewListWatchFromClient(rclient, "endpoints", namespace, nil)
slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil)
plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil)
eps := NewEndpoints(
d.logger.With("kubernetes_sd", "endpoint"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
go eps.endpointsInf.Run(ctx.Done())
go eps.serviceInf.Run(ctx.Done())
go eps.podInf.Run(ctx.Done())
for !eps.serviceInf.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
for !eps.endpointsInf.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
for !eps.podInf.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
go func() {
defer wg.Done()
eps.Run(ctx, ch)
}()
}
wg.Wait()
case "pod":
plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil)
pod := NewPod(
d.logger.With("kubernetes_sd", "pod"),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
go pod.informer.Run(ctx.Done())
var wg sync.WaitGroup
for _, namespace := range namespaces {
plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil)
pod := NewPod(
d.logger.With("kubernetes_sd", "pod"),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
go pod.informer.Run(ctx.Done())
for !pod.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
for !pod.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
go func() {
defer wg.Done()
pod.Run(ctx, ch)
}()
}
pod.Run(ctx, ch)
wg.Wait()
case "service":
slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil)
svc := NewService(
d.logger.With("kubernetes_sd", "service"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
)
go svc.informer.Run(ctx.Done())
var wg sync.WaitGroup
for _, namespace := range namespaces {
slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil)
svc := NewService(
d.logger.With("kubernetes_sd", "service"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
)
go svc.informer.Run(ctx.Done())
for !svc.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
for !svc.informer.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
go func() {
defer wg.Done()
svc.Run(ctx, ch)
}()
}
svc.Run(ctx, ch)
wg.Wait()
case "node":
nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil)
node := NewNode(