discovery/kubernetes: select between discovery role

This adds `role` field to the Kubernetes SD config, which indicates
which type of Kubernetes SD should be run.
This no longer allows discovering pods and nodes with the same SD
configuration for example.
This commit is contained in:
Fabian Reinartz 2016-07-01 19:28:29 +02:00
parent e0f8caacd7
commit 7221228843
6 changed files with 58 additions and 45 deletions

View file

@ -792,6 +792,7 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro
// KubernetesSDConfig is the configuration for Kubernetes service discovery. // KubernetesSDConfig is the configuration for Kubernetes service discovery.
type KubernetesSDConfig struct { type KubernetesSDConfig struct {
APIServers []URL `yaml:"api_servers"` APIServers []URL `yaml:"api_servers"`
Role string `yaml:"role"`
InCluster bool `yaml:"in_cluster,omitempty"` InCluster bool `yaml:"in_cluster,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"`
@ -804,6 +805,29 @@ type KubernetesSDConfig struct {
XXX map[string]interface{} `yaml:",inline"` XXX map[string]interface{} `yaml:",inline"`
} }
type KubernetesRole string
const (
KubernetesRoleNode = "node"
KubernetesRolePod = "pod"
KubernetesRoleContainer = "container"
KubernetesRoleService = "service"
KubernetesRoleEndpoint = "endpoint"
KubernetesRoleAPIServer = "apiserver"
)
func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := unmarshal((*string)(c)); err != nil {
return err
}
switch *c {
case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleContainer, KubernetesRoleService, KubernetesRoleEndpoint, KubernetesRoleAPIServer:
return nil
default:
return fmt.Errorf("Unknown Kubernetes SD role %q", c)
}
}
// UnmarshalYAML implements the yaml.Unmarshaler interface. // UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultKubernetesSDConfig *c = DefaultKubernetesSDConfig
@ -815,6 +839,9 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
if err := checkOverflow(c.XXX, "kubernetes_sd_config"); err != nil { if err := checkOverflow(c.XXX, "kubernetes_sd_config"); err != nil {
return err return err
} }
if c.Role == "" {
return fmt.Errorf("role missing (one of: container, pod, service, endpoint, node, apiserver)")
}
if len(c.APIServers) == 0 { if len(c.APIServers) == 0 {
return fmt.Errorf("Kubernetes SD configuration requires at least one Kubernetes API server") return fmt.Errorf("Kubernetes SD configuration requires at least one Kubernetes API server")
} }
@ -824,7 +851,6 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
if c.BasicAuth != nil && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) { if c.BasicAuth != nil && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) {
return fmt.Errorf("at most one of basic_auth, bearer_token & bearer_token_file must be configured") return fmt.Errorf("at most one of basic_auth, bearer_token & bearer_token_file must be configured")
} }
return nil return nil
} }

View file

@ -223,6 +223,7 @@ var expectedConf = &Config{
KubernetesSDConfigs: []*KubernetesSDConfig{ KubernetesSDConfigs: []*KubernetesSDConfig{
{ {
APIServers: []URL{kubernetesSDHostURL()}, APIServers: []URL{kubernetesSDHostURL()},
Role: KubernetesRoleEndpoint,
BasicAuth: &BasicAuth{ BasicAuth: &BasicAuth{
Username: "myusername", Username: "myusername",
Password: "mypassword", Password: "mypassword",

View file

@ -109,7 +109,8 @@ scrape_configs:
- job_name: service-kubernetes - job_name: service-kubernetes
kubernetes_sd_configs: kubernetes_sd_configs:
- api_servers: - role: endpoint
api_servers:
- 'https://localhost:1234' - 'https://localhost:1234'
basic_auth: basic_auth:

View file

@ -2,7 +2,8 @@ scrape_configs:
- job_name: prometheus - job_name: prometheus
kubernetes_sd_configs: kubernetes_sd_configs:
- api_servers: - role: node
api_servers:
- 'https://localhost:1234' - 'https://localhost:1234'
bearer_token: 1234 bearer_token: 1234

View file

@ -2,7 +2,8 @@ scrape_configs:
- job_name: prometheus - job_name: prometheus
kubernetes_sd_configs: kubernetes_sd_configs:
- api_servers: - role: pod
api_servers:
- 'https://localhost:1234' - 'https://localhost:1234'
bearer_token: 1234 bearer_token: 1234

View file

@ -127,52 +127,35 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
log.Debugf("Kubernetes Discovery.Run beginning") log.Debugf("Kubernetes Discovery.Run beginning")
defer close(ch) defer close(ch)
var wg sync.WaitGroup switch kd.Conf.Role {
case config.KubernetesRolePod, config.KubernetesRoleContainer:
pd := &podDiscovery{ pd := &podDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval), retryInterval: time.Duration(kd.Conf.RetryInterval),
kd: kd, kd: kd,
} }
wg.Add(1)
go func() {
pd.run(ctx, ch) pd.run(ctx, ch)
wg.Done() case config.KubernetesRoleNode:
}()
nd := &nodeDiscovery{ nd := &nodeDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval), retryInterval: time.Duration(kd.Conf.RetryInterval),
kd: kd, kd: kd,
} }
wg.Add(1)
go func() {
nd.run(ctx, ch) nd.run(ctx, ch)
wg.Done() case config.KubernetesRoleService, config.KubernetesRoleEndpoint:
}()
sd := &serviceDiscovery{ sd := &serviceDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval), retryInterval: time.Duration(kd.Conf.RetryInterval),
kd: kd, kd: kd,
} }
wg.Add(1)
go func() {
sd.run(ctx, ch) sd.run(ctx, ch)
wg.Done() case config.KubernetesRoleAPIServer:
}()
// Send an initial full view.
// TODO(fabxc): this does not include all available services and service
// endpoints yet. Service endpoints were also missing in the previous Sources() method.
var all []*config.TargetGroup
all = append(all, kd.updateAPIServersTargetGroup())
select { select {
case ch <- all: case ch <- []*config.TargetGroup{kd.updateAPIServersTargetGroup()}:
case <-ctx.Done(): case <-ctx.Done():
return return
} }
default:
wg.Wait() log.Errorf("unknown Kubernetes discovery kind %q", kd.Conf.Role)
return
}
} }
func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) { func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) {