mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #2642 from bakins/kubernetes-namespaces
Allow limiting Kubernetes service discover to certain namespaces
This commit is contained in:
commit
aaaec6431e
|
@ -987,12 +987,13 @@ func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error
|
||||||
|
|
||||||
// KubernetesSDConfig is the configuration for Kubernetes service discovery.
|
// KubernetesSDConfig is the configuration for Kubernetes service discovery.
|
||||||
type KubernetesSDConfig struct {
|
type KubernetesSDConfig struct {
|
||||||
APIServer URL `yaml:"api_server"`
|
APIServer URL `yaml:"api_server"`
|
||||||
Role KubernetesRole `yaml:"role"`
|
Role KubernetesRole `yaml:"role"`
|
||||||
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
|
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
|
||||||
BearerToken string `yaml:"bearer_token,omitempty"`
|
BearerToken string `yaml:"bearer_token,omitempty"`
|
||||||
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
|
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
|
||||||
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
|
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
|
||||||
|
NamespaceDiscovery KubernetesNamespaceDiscovery `yaml:"namespaces"`
|
||||||
|
|
||||||
// Catches all undefined fields and must be empty after parsing.
|
// Catches all undefined fields and must be empty after parsing.
|
||||||
XXX map[string]interface{} `yaml:",inline"`
|
XXX map[string]interface{} `yaml:",inline"`
|
||||||
|
@ -1026,6 +1027,28 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KubernetesNamespaceDiscovery is the configuration for discovering
|
||||||
|
// Kubernetes namespaces.
|
||||||
|
type KubernetesNamespaceDiscovery struct {
|
||||||
|
Names []string `yaml:"names"`
|
||||||
|
// Catches all undefined fields and must be empty after parsing.
|
||||||
|
XXX map[string]interface{} `yaml:",inline"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||||
|
func (c *KubernetesNamespaceDiscovery) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||||
|
*c = KubernetesNamespaceDiscovery{}
|
||||||
|
type plain KubernetesNamespaceDiscovery
|
||||||
|
err := unmarshal((*plain)(c))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := checkOverflow(c.XXX, "namespaces"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// GCESDConfig is the configuration for GCE based service discovery.
|
// GCESDConfig is the configuration for GCE based service discovery.
|
||||||
type GCESDConfig struct {
|
type GCESDConfig struct {
|
||||||
// Project: The Google Cloud Project ID
|
// Project: The Google Cloud Project ID
|
||||||
|
|
|
@ -305,6 +305,30 @@ var expectedConf = &Config{
|
||||||
Username: "myusername",
|
Username: "myusername",
|
||||||
Password: "mypassword",
|
Password: "mypassword",
|
||||||
},
|
},
|
||||||
|
NamespaceDiscovery: KubernetesNamespaceDiscovery{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
JobName: "service-kubernetes-namespaces",
|
||||||
|
|
||||||
|
ScrapeInterval: model.Duration(15 * time.Second),
|
||||||
|
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
|
||||||
|
|
||||||
|
MetricsPath: DefaultScrapeConfig.MetricsPath,
|
||||||
|
Scheme: DefaultScrapeConfig.Scheme,
|
||||||
|
|
||||||
|
ServiceDiscoveryConfig: ServiceDiscoveryConfig{
|
||||||
|
KubernetesSDConfigs: []*KubernetesSDConfig{
|
||||||
|
{
|
||||||
|
APIServer: kubernetesSDHostURL(),
|
||||||
|
Role: KubernetesRoleEndpoint,
|
||||||
|
NamespaceDiscovery: KubernetesNamespaceDiscovery{
|
||||||
|
Names: []string{
|
||||||
|
"default",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -592,6 +616,9 @@ var expectedErrors = []struct {
|
||||||
}, {
|
}, {
|
||||||
filename: "kubernetes_role.bad.yml",
|
filename: "kubernetes_role.bad.yml",
|
||||||
errMsg: "role",
|
errMsg: "role",
|
||||||
|
}, {
|
||||||
|
filename: "kubernetes_namespace_discovery.bad.yml",
|
||||||
|
errMsg: "unknown fields in namespaces",
|
||||||
}, {
|
}, {
|
||||||
filename: "kubernetes_bearertoken_basicauth.bad.yml",
|
filename: "kubernetes_bearertoken_basicauth.bad.yml",
|
||||||
errMsg: "at most one of basic_auth, bearer_token & bearer_token_file must be configured",
|
errMsg: "at most one of basic_auth, bearer_token & bearer_token_file must be configured",
|
||||||
|
|
9
config/testdata/conf.good.yml
vendored
9
config/testdata/conf.good.yml
vendored
|
@ -146,6 +146,15 @@ scrape_configs:
|
||||||
username: 'myusername'
|
username: 'myusername'
|
||||||
password: 'mypassword'
|
password: 'mypassword'
|
||||||
|
|
||||||
|
- job_name: service-kubernetes-namespaces
|
||||||
|
|
||||||
|
kubernetes_sd_configs:
|
||||||
|
- role: endpoints
|
||||||
|
api_server: 'https://localhost:1234'
|
||||||
|
namespaces:
|
||||||
|
names:
|
||||||
|
- default
|
||||||
|
|
||||||
- job_name: service-marathon
|
- job_name: service-marathon
|
||||||
marathon_sd_configs:
|
marathon_sd_configs:
|
||||||
- servers:
|
- servers:
|
||||||
|
|
6
config/testdata/kubernetes_namespace_discovery.bad.yml
vendored
Normal file
6
config/testdata/kubernetes_namespace_discovery.bad.yml
vendored
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
scrape_configs:
|
||||||
|
- kubernetes_sd_configs:
|
||||||
|
- api_server: kubernetes:443
|
||||||
|
role: endpoints
|
||||||
|
namespaces:
|
||||||
|
foo: bar
|
|
@ -15,6 +15,7 @@ package kubernetes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
@ -62,9 +63,10 @@ func init() {
|
||||||
// Discovery implements the TargetProvider interface for discovering
|
// Discovery implements the TargetProvider interface for discovering
|
||||||
// targets from Kubernetes.
|
// targets from Kubernetes.
|
||||||
type Discovery struct {
|
type Discovery struct {
|
||||||
client kubernetes.Interface
|
client kubernetes.Interface
|
||||||
role config.KubernetesRole
|
role config.KubernetesRole
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
|
namespaceDiscovery *config.KubernetesNamespaceDiscovery
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -75,6 +77,14 @@ func init() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Discovery) getNamespaces() []string {
|
||||||
|
namespaces := d.namespaceDiscovery.Names
|
||||||
|
if len(namespaces) == 0 {
|
||||||
|
namespaces = []string{api.NamespaceAll}
|
||||||
|
}
|
||||||
|
return namespaces
|
||||||
|
}
|
||||||
|
|
||||||
// New creates a new Kubernetes discovery for the given role.
|
// New creates a new Kubernetes discovery for the given role.
|
||||||
func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) {
|
func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) {
|
||||||
var (
|
var (
|
||||||
|
@ -137,9 +147,10 @@ func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Discovery{
|
return &Discovery{
|
||||||
client: c,
|
client: c,
|
||||||
logger: l,
|
logger: l,
|
||||||
role: conf.Role,
|
role: conf.Role,
|
||||||
|
namespaceDiscovery: &conf.NamespaceDiscovery,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,58 +160,82 @@ const resyncPeriod = 10 * time.Minute
|
||||||
func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||||
rclient := d.client.Core().GetRESTClient()
|
rclient := d.client.Core().GetRESTClient()
|
||||||
|
|
||||||
|
namespaces := d.getNamespaces()
|
||||||
|
|
||||||
switch d.role {
|
switch d.role {
|
||||||
case "endpoints":
|
case "endpoints":
|
||||||
elw := cache.NewListWatchFromClient(rclient, "endpoints", api.NamespaceAll, nil)
|
var wg sync.WaitGroup
|
||||||
slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil)
|
|
||||||
plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil)
|
|
||||||
eps := NewEndpoints(
|
|
||||||
d.logger.With("kubernetes_sd", "endpoint"),
|
|
||||||
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
|
|
||||||
cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod),
|
|
||||||
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
|
|
||||||
)
|
|
||||||
go eps.endpointsInf.Run(ctx.Done())
|
|
||||||
go eps.serviceInf.Run(ctx.Done())
|
|
||||||
go eps.podInf.Run(ctx.Done())
|
|
||||||
|
|
||||||
for !eps.serviceInf.HasSynced() {
|
for _, namespace := range namespaces {
|
||||||
time.Sleep(100 * time.Millisecond)
|
elw := cache.NewListWatchFromClient(rclient, "endpoints", namespace, nil)
|
||||||
}
|
slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil)
|
||||||
for !eps.endpointsInf.HasSynced() {
|
plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil)
|
||||||
time.Sleep(100 * time.Millisecond)
|
eps := NewEndpoints(
|
||||||
}
|
d.logger.With("kubernetes_sd", "endpoint"),
|
||||||
for !eps.podInf.HasSynced() {
|
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
|
||||||
time.Sleep(100 * time.Millisecond)
|
cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod),
|
||||||
}
|
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
|
||||||
eps.Run(ctx, ch)
|
)
|
||||||
|
go eps.endpointsInf.Run(ctx.Done())
|
||||||
|
go eps.serviceInf.Run(ctx.Done())
|
||||||
|
go eps.podInf.Run(ctx.Done())
|
||||||
|
|
||||||
|
for !eps.serviceInf.HasSynced() {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
for !eps.endpointsInf.HasSynced() {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
for !eps.podInf.HasSynced() {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
eps.Run(ctx, ch)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
case "pod":
|
case "pod":
|
||||||
plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil)
|
var wg sync.WaitGroup
|
||||||
pod := NewPod(
|
for _, namespace := range namespaces {
|
||||||
d.logger.With("kubernetes_sd", "pod"),
|
plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil)
|
||||||
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
|
pod := NewPod(
|
||||||
)
|
d.logger.With("kubernetes_sd", "pod"),
|
||||||
go pod.informer.Run(ctx.Done())
|
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
|
||||||
|
)
|
||||||
|
go pod.informer.Run(ctx.Done())
|
||||||
|
|
||||||
for !pod.informer.HasSynced() {
|
for !pod.informer.HasSynced() {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
pod.Run(ctx, ch)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
pod.Run(ctx, ch)
|
wg.Wait()
|
||||||
|
|
||||||
case "service":
|
case "service":
|
||||||
slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil)
|
var wg sync.WaitGroup
|
||||||
svc := NewService(
|
for _, namespace := range namespaces {
|
||||||
d.logger.With("kubernetes_sd", "service"),
|
slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil)
|
||||||
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
|
svc := NewService(
|
||||||
)
|
d.logger.With("kubernetes_sd", "service"),
|
||||||
go svc.informer.Run(ctx.Done())
|
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
|
||||||
|
)
|
||||||
|
go svc.informer.Run(ctx.Done())
|
||||||
|
|
||||||
for !svc.informer.HasSynced() {
|
for !svc.informer.HasSynced() {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
svc.Run(ctx, ch)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
svc.Run(ctx, ch)
|
wg.Wait()
|
||||||
|
|
||||||
case "node":
|
case "node":
|
||||||
nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil)
|
nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil)
|
||||||
node := NewNode(
|
node := NewNode(
|
||||||
|
|
Loading…
Reference in a new issue