From 782d00059acaa7784857479ceaee95b8a2cdd9f3 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 25 Mar 2019 11:54:22 +0100 Subject: [PATCH] discovery: factorize for SD based on refresh (#5381) * discovery: factorize for SD based on refresh Signed-off-by: Simon Pasquier * discovery: use common metrics for refresh Signed-off-by: Simon Pasquier --- discovery/azure/azure.go | 104 +--- discovery/consul/consul.go | 1 - discovery/dns/dns.go | 72 +-- discovery/ec2/ec2.go | 93 +-- discovery/gce/gce.go | 109 +--- discovery/marathon/marathon.go | 180 ++---- discovery/marathon/marathon_test.go | 791 ++++++++++++------------- discovery/openstack/hypervisor.go | 20 +- discovery/openstack/hypervisor_test.go | 10 +- discovery/openstack/instance.go | 19 +- discovery/openstack/instance_test.go | 11 +- discovery/openstack/openstack.go | 102 +--- discovery/refresh/refresh.go | 117 ++++ discovery/refresh/refresh_test.go | 83 +++ discovery/triton/triton.go | 100 +--- discovery/triton/triton_test.go | 31 +- 16 files changed, 816 insertions(+), 1027 deletions(-) create mode 100644 discovery/refresh/refresh.go create mode 100644 discovery/refresh/refresh_test.go diff --git a/discovery/azure/azure.go b/discovery/azure/azure.go index 35090731a3..d761b1d471 100644 --- a/discovery/azure/azure.go +++ b/discovery/azure/azure.go @@ -29,9 +29,10 @@ import ( "github.com/Azure/go-autorest/autorest/azure" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/discovery/refresh" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/strutil" ) @@ -53,26 +54,13 @@ const ( authMethodManagedIdentity = "ManagedIdentity" ) -var ( - azureSDRefreshFailuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_azure_refresh_failures_total", - Help: "Number of Azure-SD refresh failures.", - }) - azureSDRefreshDuration = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "prometheus_sd_azure_refresh_duration_seconds", - Help: "The duration of a Azure-SD refresh in seconds.", - }) - - // DefaultSDConfig is the default Azure SD configuration. - DefaultSDConfig = SDConfig{ - Port: 80, - RefreshInterval: model.Duration(5 * time.Minute), - Environment: azure.PublicCloud.Name, - AuthenticationMethod: authMethodOAuth, - } -) +// DefaultSDConfig is the default Azure SD configuration. +var DefaultSDConfig = SDConfig{ + Port: 80, + RefreshInterval: model.Duration(5 * time.Minute), + Environment: azure.PublicCloud.Name, + AuthenticationMethod: authMethodOAuth, +} // SDConfig is the configuration for Azure based service discovery. type SDConfig struct { @@ -125,18 +113,11 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } -func init() { - prometheus.MustRegister(azureSDRefreshDuration) - prometheus.MustRegister(azureSDRefreshFailuresCount) -} - -// Discovery periodically performs Azure-SD requests. It implements -// the Discoverer interface. type Discovery struct { - cfg *SDConfig - interval time.Duration - port int - logger log.Logger + *refresh.Discovery + logger log.Logger + cfg *SDConfig + port int } // NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets. @@ -144,42 +125,18 @@ func NewDiscovery(cfg *SDConfig, logger log.Logger) *Discovery { if logger == nil { logger = log.NewNopLogger() } - return &Discovery{ - cfg: cfg, - interval: time.Duration(cfg.RefreshInterval), - port: cfg.Port, - logger: logger, - } -} - -// Run implements the Discoverer interface. -func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - ticker := time.NewTicker(d.interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - default: - } - - tg, err := d.refresh(ctx) - if err != nil { - level.Error(d.logger).Log("msg", "Unable to refresh during Azure discovery", "err", err) - } else { - select { - case <-ctx.Done(): - case ch <- []*targetgroup.Group{tg}: - } - } - - select { - case <-ticker.C: - case <-ctx.Done(): - return - } + d := &Discovery{ + cfg: cfg, + port: cfg.Port, + logger: logger, } + d.Discovery = refresh.NewDiscovery( + logger, + "azure", + time.Duration(cfg.RefreshInterval), + d.refresh, + ) + return d } // azureClient represents multiple Azure Resource Manager providers. @@ -281,17 +238,9 @@ func newAzureResourceFromID(id string, logger log.Logger) (azureResource, error) }, nil } -func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err error) { +func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { defer level.Debug(d.logger).Log("msg", "Azure discovery completed") - t0 := time.Now() - defer func() { - azureSDRefreshDuration.Observe(time.Since(t0).Seconds()) - if err != nil { - azureSDRefreshFailuresCount.Inc() - } - }() - tg = &targetgroup.Group{} client, err := createAzureClient(*d.cfg) if err != nil { return nil, fmt.Errorf("could not create Azure client: %s", err) @@ -405,6 +354,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err wg.Wait() close(ch) + var tg targetgroup.Group for tgt := range ch { if tgt.err != nil { return nil, fmt.Errorf("unable to complete Azure service discovery: %s", err) @@ -414,7 +364,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err } } - return tg, nil + return []*targetgroup.Group{&tg}, nil } func (client *azureClient) getVMs(ctx context.Context) ([]virtualMachine, error) { diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go index 438a612314..8a22659c11 100644 --- a/discovery/consul/consul.go +++ b/discovery/consul/consul.go @@ -327,7 +327,6 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { <-ticker.C } } - } else { // We only have fully defined services. for _, name := range d.watchedServices { diff --git a/discovery/dns/dns.go b/discovery/dns/dns.go index 672d4eb5c8..fbeefe8278 100644 --- a/discovery/dns/dns.go +++ b/discovery/dns/dns.go @@ -26,6 +26,7 @@ import ( "github.com/miekg/dns" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/refresh" "github.com/prometheus/prometheus/discovery/targetgroup" ) @@ -98,12 +99,11 @@ func init() { // Discovery periodically performs DNS-SD requests. It implements // the Discoverer interface. type Discovery struct { - names []string - - interval time.Duration - port int - qtype uint16 - logger log.Logger + *refresh.Discovery + names []string + port int + qtype uint16 + logger log.Logger } // NewDiscovery returns a new Discovery which periodically refreshes its targets. @@ -121,50 +121,50 @@ func NewDiscovery(conf SDConfig, logger log.Logger) *Discovery { case "SRV": qtype = dns.TypeSRV } - return &Discovery{ - names: conf.Names, - interval: time.Duration(conf.RefreshInterval), - qtype: qtype, - port: conf.Port, - logger: logger, + d := &Discovery{ + names: conf.Names, + qtype: qtype, + port: conf.Port, + logger: logger, } + d.Discovery = refresh.NewDiscovery( + logger, + "dns", + time.Duration(conf.RefreshInterval), + d.refresh, + ) + return d } -// Run implements the Discoverer interface. -func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - ticker := time.NewTicker(d.interval) - defer ticker.Stop() - - // Get an initial set right away. - d.refreshAll(ctx, ch) - - for { - select { - case <-ticker.C: - d.refreshAll(ctx, ch) - case <-ctx.Done(): - return - } - } -} - -func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*targetgroup.Group) { - var wg sync.WaitGroup +func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { + var ( + wg sync.WaitGroup + ch = make(chan *targetgroup.Group) + tgs = make([]*targetgroup.Group, len(d.names)) + ) wg.Add(len(d.names)) for _, name := range d.names { go func(n string) { - if err := d.refresh(ctx, n, ch); err != nil { + if err := d.refreshOne(ctx, n, ch); err != nil { level.Error(d.logger).Log("msg", "Error refreshing DNS targets", "err", err) } wg.Done() }(name) } - wg.Wait() + go func() { + wg.Wait() + close(ch) + }() + + for tg := range ch { + tgs = append(tgs, tg) + } + return tgs, nil } -func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*targetgroup.Group) error { +func (d *Discovery) refreshOne(ctx context.Context, name string, ch chan<- *targetgroup.Group) error { response, err := lookupWithSearchPath(name, d.qtype, d.logger) dnsSDLookupsCount.Inc() if err != nil { @@ -203,7 +203,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*targe select { case <-ctx.Done(): return ctx.Err() - case ch <- []*targetgroup.Group{tg}: + case ch <- tg: } return nil diff --git a/discovery/ec2/ec2.go b/discovery/ec2/ec2.go index 6dc1fbce74..05606a6276 100644 --- a/discovery/ec2/ec2.go +++ b/discovery/ec2/ec2.go @@ -26,12 +26,11 @@ import ( "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/aws/aws-sdk-go/service/ec2" config_util "github.com/prometheus/common/config" + "github.com/prometheus/prometheus/discovery/refresh" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/strutil" ) @@ -55,23 +54,11 @@ const ( subnetSeparator = "," ) -var ( - ec2SDRefreshFailuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_ec2_refresh_failures_total", - Help: "The number of EC2-SD scrape failures.", - }) - ec2SDRefreshDuration = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "prometheus_sd_ec2_refresh_duration_seconds", - Help: "The duration of a EC2-SD refresh in seconds.", - }) - // DefaultSDConfig is the default EC2 SD configuration. - DefaultSDConfig = SDConfig{ - Port: 80, - RefreshInterval: model.Duration(60 * time.Second), - } -) +// DefaultSDConfig is the default EC2 SD configuration. +var DefaultSDConfig = SDConfig{ + Port: 80, + RefreshInterval: model.Duration(60 * time.Second), +} // Filter is the configuration for filtering EC2 instances. type Filter struct { @@ -120,21 +107,16 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } -func init() { - prometheus.MustRegister(ec2SDRefreshFailuresCount) - prometheus.MustRegister(ec2SDRefreshDuration) -} - // Discovery periodically performs EC2-SD requests. It implements // the Discoverer interface. type Discovery struct { + *refresh.Discovery aws *aws.Config interval time.Duration profile string roleARN string port int filters []*Filter - logger log.Logger } // NewDiscovery returns a new EC2Discovery which periodically refreshes its targets. @@ -146,7 +128,7 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) *Discovery { if logger == nil { logger = log.NewNopLogger() } - return &Discovery{ + d := &Discovery{ aws: &aws.Config{ Endpoint: &conf.Endpoint, Region: &conf.Region, @@ -157,56 +139,17 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) *Discovery { filters: conf.Filters, interval: time.Duration(conf.RefreshInterval), port: conf.Port, - logger: logger, } + d.Discovery = refresh.NewDiscovery( + logger, + "ec2", + time.Duration(conf.RefreshInterval), + d.refresh, + ) + return d } -// Run implements the Discoverer interface. -func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - ticker := time.NewTicker(d.interval) - defer ticker.Stop() - - // Get an initial set right away. - tg, err := d.refresh(ctx) - if err != nil { - level.Error(d.logger).Log("msg", "Refresh failed", "err", err) - } else { - select { - case ch <- []*targetgroup.Group{tg}: - case <-ctx.Done(): - return - } - } - - for { - select { - case <-ticker.C: - tg, err := d.refresh(ctx) - if err != nil { - level.Error(d.logger).Log("msg", "Refresh failed", "err", err) - continue - } - - select { - case ch <- []*targetgroup.Group{tg}: - case <-ctx.Done(): - return - } - case <-ctx.Done(): - return - } - } -} - -func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err error) { - t0 := time.Now() - defer func() { - ec2SDRefreshDuration.Observe(time.Since(t0).Seconds()) - if err != nil { - ec2SDRefreshFailuresCount.Inc() - } - }() - +func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { sess, err := session.NewSessionWithOptions(session.Options{ Config: *d.aws, Profile: d.profile, @@ -222,7 +165,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err } else { ec2s = ec2.New(sess) } - tg = &targetgroup.Group{ + tg := &targetgroup.Group{ Source: *d.aws.Region, } @@ -306,5 +249,5 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err }); err != nil { return nil, fmt.Errorf("could not describe instances: %s", err) } - return tg, nil + return []*targetgroup.Group{tg}, nil } diff --git a/discovery/gce/gce.go b/discovery/gce/gce.go index da9e2f4c94..19dbb8dbf5 100644 --- a/discovery/gce/gce.go +++ b/discovery/gce/gce.go @@ -22,12 +22,11 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "golang.org/x/oauth2/google" compute "google.golang.org/api/compute/v1" + "github.com/prometheus/prometheus/discovery/refresh" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/strutil" ) @@ -49,24 +48,12 @@ const ( gceLabelMachineType = gceLabel + "machine_type" ) -var ( - gceSDRefreshFailuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_gce_refresh_failures_total", - Help: "The number of GCE-SD refresh failures.", - }) - gceSDRefreshDuration = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "prometheus_sd_gce_refresh_duration", - Help: "The duration of a GCE-SD refresh in seconds.", - }) - // DefaultSDConfig is the default GCE SD configuration. - DefaultSDConfig = SDConfig{ - Port: 80, - TagSeparator: ",", - RefreshInterval: model.Duration(60 * time.Second), - } -) +// DefaultSDConfig is the default GCE SD configuration. +var DefaultSDConfig = SDConfig{ + Port: 80, + TagSeparator: ",", + RefreshInterval: model.Duration(60 * time.Second), +} // SDConfig is the configuration for GCE based service discovery. type SDConfig struct { @@ -104,97 +91,51 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } -func init() { - prometheus.MustRegister(gceSDRefreshFailuresCount) - prometheus.MustRegister(gceSDRefreshDuration) -} - // Discovery periodically performs GCE-SD requests. It implements // the Discoverer interface. type Discovery struct { + *refresh.Discovery project string zone string filter string client *http.Client svc *compute.Service isvc *compute.InstancesService - interval time.Duration port int tagSeparator string - logger log.Logger } // NewDiscovery returns a new Discovery which periodically refreshes its targets. func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) { - if logger == nil { - logger = log.NewNopLogger() - } - gd := &Discovery{ + d := &Discovery{ project: conf.Project, zone: conf.Zone, filter: conf.Filter, - interval: time.Duration(conf.RefreshInterval), port: conf.Port, tagSeparator: conf.TagSeparator, - logger: logger, } var err error - gd.client, err = google.DefaultClient(context.Background(), compute.ComputeReadonlyScope) + d.client, err = google.DefaultClient(context.Background(), compute.ComputeReadonlyScope) if err != nil { return nil, fmt.Errorf("error setting up communication with GCE service: %s", err) } - gd.svc, err = compute.New(gd.client) + d.svc, err = compute.New(d.client) if err != nil { return nil, fmt.Errorf("error setting up communication with GCE service: %s", err) } - gd.isvc = compute.NewInstancesService(gd.svc) - return gd, nil + d.isvc = compute.NewInstancesService(d.svc) + + d.Discovery = refresh.NewDiscovery( + logger, + "gce", + time.Duration(conf.RefreshInterval), + d.refresh, + ) + return d, nil } -// Run implements the Discoverer interface. -func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - // Get an initial set right away. - tg, err := d.refresh(ctx) - if err != nil { - level.Error(d.logger).Log("msg", "Refresh failed", "err", err) - } else { - select { - case ch <- []*targetgroup.Group{tg}: - case <-ctx.Done(): - } - } - - ticker := time.NewTicker(d.interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - tg, err := d.refresh(ctx) - if err != nil { - level.Error(d.logger).Log("msg", "Refresh failed", "err", err) - continue - } - select { - case ch <- []*targetgroup.Group{tg}: - case <-ctx.Done(): - } - case <-ctx.Done(): - return - } - } -} - -func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err error) { - t0 := time.Now() - defer func() { - gceSDRefreshDuration.Observe(time.Since(t0).Seconds()) - if err != nil { - gceSDRefreshFailuresCount.Inc() - } - }() - - tg = &targetgroup.Group{ +func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { + tg := &targetgroup.Group{ Source: fmt.Sprintf("GCE_%s_%s", d.project, d.zone), } @@ -202,7 +143,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err if len(d.filter) > 0 { ilc = ilc.Filter(d.filter) } - err = ilc.Pages(ctx, func(l *compute.InstanceList) error { + err := ilc.Pages(ctx, func(l *compute.InstanceList) error { for _, inst := range l.Items { if len(inst.NetworkInterfaces) == 0 { continue @@ -259,7 +200,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err return nil }) if err != nil { - return tg, fmt.Errorf("error retrieving refresh targets from gce: %s", err) + return nil, fmt.Errorf("error retrieving refresh targets from gce: %s", err) } - return tg, nil + return []*targetgroup.Group{tg}, nil } diff --git a/discovery/marathon/marathon.go b/discovery/marathon/marathon.go index eb6c83f35f..d94f62ef7d 100644 --- a/discovery/marathon/marathon.go +++ b/discovery/marathon/marathon.go @@ -26,10 +26,10 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/discovery/refresh" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/strutil" ) @@ -54,29 +54,12 @@ const ( portMappingLabelPrefix = metaLabelPrefix + "port_mapping_label_" // portDefinitionLabelPrefix is the prefix for the application portDefinitions labels. portDefinitionLabelPrefix = metaLabelPrefix + "port_definition_label_" - - // Constants for instrumentation. - namespace = "prometheus" ) -var ( - refreshFailuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_marathon_refresh_failures_total", - Help: "The number of Marathon-SD refresh failures.", - }) - refreshDuration = prometheus.NewSummary( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_marathon_refresh_duration_seconds", - Help: "The duration of a Marathon-SD refresh in seconds.", - }) - // DefaultSDConfig is the default Marathon SD configuration. - DefaultSDConfig = SDConfig{ - RefreshInterval: model.Duration(30 * time.Second), - } -) +// DefaultSDConfig is the default Marathon SD configuration. +var DefaultSDConfig = SDConfig{ + RefreshInterval: model.Duration(30 * time.Second), +} // SDConfig is the configuration for services running on Marathon. type SDConfig struct { @@ -110,29 +93,19 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { return c.HTTPClientConfig.Validate() } -func init() { - prometheus.MustRegister(refreshFailuresCount) - prometheus.MustRegister(refreshDuration) -} - const appListPath string = "/v2/apps/?embed=apps.tasks" // Discovery provides service discovery based on a Marathon instance. type Discovery struct { - client *http.Client - servers []string - refreshInterval time.Duration - lastRefresh map[string]*targetgroup.Group - appsClient AppListClient - logger log.Logger + *refresh.Discovery + client *http.Client + servers []string + lastRefresh map[string]*targetgroup.Group + appsClient appListClient } // NewDiscovery returns a new Marathon Discovery. func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) { - if logger == nil { - logger = log.NewNopLogger() - } - rt, err := config_util.NewRoundTripperFromConfig(conf.HTTPClientConfig, "marathon_sd") if err != nil { return nil, err @@ -147,13 +120,18 @@ func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) { return nil, err } - return &Discovery{ - client: &http.Client{Transport: rt}, - servers: conf.Servers, - refreshInterval: time.Duration(conf.RefreshInterval), - appsClient: fetchApps, - logger: logger, - }, nil + d := &Discovery{ + client: &http.Client{Transport: rt}, + servers: conf.Servers, + appsClient: fetchApps, + } + d.Discovery = refresh.NewDiscovery( + logger, + "marathon", + time.Duration(conf.RefreshInterval), + d.refresh, + ) + return d, nil } type authTokenRoundTripper struct { @@ -204,33 +182,10 @@ func (rt *authTokenFileRoundTripper) RoundTrip(request *http.Request) (*http.Res return rt.rt.RoundTrip(request) } -// Run implements the Discoverer interface. -func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - for { - select { - case <-ctx.Done(): - return - case <-time.After(d.refreshInterval): - err := d.updateServices(ctx, ch) - if err != nil { - level.Error(d.logger).Log("msg", "Error while updating services", "err", err) - } - } - } -} - -func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*targetgroup.Group) (err error) { - t0 := time.Now() - defer func() { - refreshDuration.Observe(time.Since(t0).Seconds()) - if err != nil { - refreshFailuresCount.Inc() - } - }() - +func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { targetMap, err := d.fetchTargetGroups(ctx) if err != nil { - return err + return nil, err } all := make([]*targetgroup.Group, 0, len(targetMap)) @@ -240,54 +195,49 @@ func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*targetgroup select { case <-ctx.Done(): - return ctx.Err() - case ch <- all: + return nil, ctx.Err() + default: } // Remove services which did disappear. for source := range d.lastRefresh { _, ok := targetMap[source] if !ok { - select { - case <-ctx.Done(): - return ctx.Err() - case ch <- []*targetgroup.Group{{Source: source}}: - level.Debug(d.logger).Log("msg", "Removing group", "source", source) - } + all = append(all, &targetgroup.Group{Source: source}) } } d.lastRefresh = targetMap - return nil + return all, nil } func (d *Discovery) fetchTargetGroups(ctx context.Context) (map[string]*targetgroup.Group, error) { - url := RandomAppsURL(d.servers) + url := randomAppsURL(d.servers) apps, err := d.appsClient(ctx, d.client, url) if err != nil { return nil, err } - groups := AppsToTargetGroups(apps) + groups := appsToTargetGroups(apps) return groups, nil } -// Task describes one instance of a service running on Marathon. -type Task struct { +// task describes one instance of a service running on Marathon. +type task struct { ID string `json:"id"` Host string `json:"host"` Ports []uint32 `json:"ports"` - IPAddresses []IPAddress `json:"ipAddresses"` + IPAddresses []ipAddress `json:"ipAddresses"` } -// IPAddress describes the address and protocol the container's network interface is bound to. -type IPAddress struct { +// ipAddress describes the address and protocol the container's network interface is bound to. +type ipAddress struct { Address string `json:"ipAddress"` Proto string `json:"protocol"` } // PortMapping describes in which port the process are binding inside the docker container. -type PortMapping struct { +type portMapping struct { Labels map[string]string `json:"labels"` ContainerPort uint32 `json:"containerPort"` HostPort uint32 `json:"hostPort"` @@ -295,56 +245,56 @@ type PortMapping struct { } // DockerContainer describes a container which uses the docker runtime. -type DockerContainer struct { +type dockerContainer struct { Image string `json:"image"` - PortMappings []PortMapping `json:"portMappings"` + PortMappings []portMapping `json:"portMappings"` } // Container describes the runtime an app in running in. -type Container struct { - Docker DockerContainer `json:"docker"` - PortMappings []PortMapping `json:"portMappings"` +type container struct { + Docker dockerContainer `json:"docker"` + PortMappings []portMapping `json:"portMappings"` } // PortDefinition describes which load balancer port you should access to access the service. -type PortDefinition struct { +type portDefinition struct { Labels map[string]string `json:"labels"` Port uint32 `json:"port"` } // Network describes the name and type of network the container is attached to. -type Network struct { +type network struct { Name string `json:"name"` Mode string `json:"mode"` } // App describes a service running on Marathon. -type App struct { +type app struct { ID string `json:"id"` - Tasks []Task `json:"tasks"` + Tasks []task `json:"tasks"` RunningTasks int `json:"tasksRunning"` Labels map[string]string `json:"labels"` - Container Container `json:"container"` - PortDefinitions []PortDefinition `json:"portDefinitions"` - Networks []Network `json:"networks"` + Container container `json:"container"` + PortDefinitions []portDefinition `json:"portDefinitions"` + Networks []network `json:"networks"` RequirePorts bool `json:"requirePorts"` } // isContainerNet checks if the app's first network is set to mode 'container'. -func (app App) isContainerNet() bool { +func (app app) isContainerNet() bool { return len(app.Networks) > 0 && app.Networks[0].Mode == "container" } -// AppList is a list of Marathon apps. -type AppList struct { - Apps []App `json:"apps"` +// appList is a list of Marathon apps. +type appList struct { + Apps []app `json:"apps"` } -// AppListClient defines a function that can be used to get an application list from marathon. -type AppListClient func(ctx context.Context, client *http.Client, url string) (*AppList, error) +// appListClient defines a function that can be used to get an application list from marathon. +type appListClient func(ctx context.Context, client *http.Client, url string) (*appList, error) // fetchApps requests a list of applications from a marathon server. -func fetchApps(ctx context.Context, client *http.Client, url string) (*AppList, error) { +func fetchApps(ctx context.Context, client *http.Client, url string) (*appList, error) { request, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err @@ -361,7 +311,7 @@ func fetchApps(ctx context.Context, client *http.Client, url string) (*AppList, return nil, fmt.Errorf("non 2xx status '%v' response during marathon service discovery", resp.StatusCode) } - var apps AppList + var apps appList err = json.NewDecoder(resp.Body).Decode(&apps) if err != nil { return nil, fmt.Errorf("%q: %v", url, err) @@ -369,16 +319,16 @@ func fetchApps(ctx context.Context, client *http.Client, url string) (*AppList, return &apps, nil } -// RandomAppsURL randomly selects a server from an array and creates +// randomAppsURL randomly selects a server from an array and creates // an URL pointing to the app list. -func RandomAppsURL(servers []string) string { +func randomAppsURL(servers []string) string { // TODO: If possible update server list from Marathon at some point. server := servers[rand.Intn(len(servers))] return fmt.Sprintf("%s%s", server, appListPath) } -// AppsToTargetGroups takes an array of Marathon apps and converts them into target groups. -func AppsToTargetGroups(apps *AppList) map[string]*targetgroup.Group { +// appsToTargetGroups takes an array of Marathon apps and converts them into target groups. +func appsToTargetGroups(apps *appList) map[string]*targetgroup.Group { tgroups := map[string]*targetgroup.Group{} for _, a := range apps.Apps { group := createTargetGroup(&a) @@ -387,7 +337,7 @@ func AppsToTargetGroups(apps *AppList) map[string]*targetgroup.Group { return tgroups } -func createTargetGroup(app *App) *targetgroup.Group { +func createTargetGroup(app *app) *targetgroup.Group { var ( targets = targetsForApp(app) appName = model.LabelValue(app.ID) @@ -410,7 +360,7 @@ func createTargetGroup(app *App) *targetgroup.Group { return tg } -func targetsForApp(app *App) []model.LabelSet { +func targetsForApp(app *app) []model.LabelSet { targets := make([]model.LabelSet, 0, len(app.Tasks)) var ports []uint32 @@ -494,7 +444,7 @@ func targetsForApp(app *App) []model.LabelSet { } // Generate a target endpoint string in host:port format. -func targetEndpoint(task *Task, port uint32, containerNet bool) string { +func targetEndpoint(task *task, port uint32, containerNet bool) string { var host string @@ -509,7 +459,7 @@ func targetEndpoint(task *Task, port uint32, containerNet bool) string { } // Get a list of ports and a list of labels from a PortMapping. -func extractPortMapping(portMappings []PortMapping, containerNet bool) ([]uint32, []map[string]string) { +func extractPortMapping(portMappings []portMapping, containerNet bool) ([]uint32, []map[string]string) { ports := make([]uint32, len(portMappings)) labels := make([]map[string]string, len(portMappings)) diff --git a/discovery/marathon/marathon_test.go b/discovery/marathon/marathon_test.go index 2d50dcf4cb..b80636a1b3 100644 --- a/discovery/marathon/marathon_test.go +++ b/discovery/marathon/marathon_test.go @@ -20,7 +20,6 @@ import ( "net/http" "net/http/httptest" "testing" - "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -32,290 +31,251 @@ var ( conf = SDConfig{Servers: testServers} ) -func testUpdateServices(client AppListClient, ch chan []*targetgroup.Group) error { +func testUpdateServices(client appListClient) ([]*targetgroup.Group, error) { md, err := NewDiscovery(conf, nil) if err != nil { - return err + return nil, err } - md.appsClient = client - return md.updateServices(context.Background(), ch) + if client != nil { + md.appsClient = client + } + return md.refresh(context.Background()) } func TestMarathonSDHandleError(t *testing.T) { var ( errTesting = errors.New("testing failure") - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { return nil, errTesting } + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { + return nil, errTesting + } ) - if err := testUpdateServices(client, ch); err != errTesting { + tgs, err := testUpdateServices(client) + if err != errTesting { t.Fatalf("Expected error: %s", err) } - select { - case tg := <-ch: - t.Fatalf("Got group: %s", tg) - default: + if len(tgs) != 0 { + t.Fatalf("Got group: %s", tgs) } } func TestMarathonSDEmptyList(t *testing.T) { var ( - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { return &AppList{}, nil } + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return &appList{}, nil } ) - if err := testUpdateServices(client, ch); err != nil { + tgs, err := testUpdateServices(client) + if err != nil { t.Fatalf("Got error: %s", err) } - select { - case tg := <-ch: - if len(tg) > 0 { - t.Fatalf("Got group: %v", tg) - } - default: + if len(tgs) > 0 { + t.Fatalf("Got group: %v", tgs) } } -func marathonTestAppList(labels map[string]string, runningTasks int) *AppList { +func marathonTestAppList(labels map[string]string, runningTasks int) *appList { var ( - task = Task{ + t = task{ ID: "test-task-1", Host: "mesos-slave1", } - docker = DockerContainer{ + docker = dockerContainer{ Image: "repo/image:tag", } - portMappings = []PortMapping{ + portMappings = []portMapping{ {Labels: labels, HostPort: 31000}, } - container = Container{Docker: docker, PortMappings: portMappings} - app = App{ + container = container{Docker: docker, PortMappings: portMappings} + a = app{ ID: "test-service", - Tasks: []Task{task}, + Tasks: []task{t}, RunningTasks: runningTasks, Labels: labels, Container: container, } ) - return &AppList{ - Apps: []App{app}, + return &appList{ + Apps: []app{a}, } } func TestMarathonSDSendGroup(t *testing.T) { var ( - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestAppList(marathonValidLabel, 1), nil } ) - if err := testUpdateServices(client, ch); err != nil { + tgs, err := testUpdateServices(client) + if err != nil { t.Fatalf("Got error: %s", err) } - select { - case tgs := <-ch: - tg := tgs[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 target group, got", len(tgs)) + } - if tg.Source != "test-service" { - t.Fatalf("Wrong target group name: %s", tg.Source) - } - if len(tg.Targets) != 1 { - t.Fatalf("Wrong number of targets: %v", tg.Targets) - } - tgt := tg.Targets[0] - if tgt[model.AddressLabel] != "mesos-slave1:31000" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { - t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) - } - default: - t.Fatal("Did not get a target group.") + tg := tgs[0] + + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 1 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "mesos-slave1:31000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) } } func TestMarathonSDRemoveApp(t *testing.T) { - var ch = make(chan []*targetgroup.Group, 1) md, err := NewDiscovery(conf, nil) if err != nil { t.Fatalf("%s", err) } - md.appsClient = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + md.appsClient = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestAppList(marathonValidLabel, 1), nil } - if err := md.updateServices(context.Background(), ch); err != nil { + tgs, err := md.refresh(context.Background()) + if err != nil { t.Fatalf("Got error on first update: %s", err) } - up1 := (<-ch)[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 targetgroup, got", len(tgs)) + } + tg1 := tgs[0] - md.appsClient = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + md.appsClient = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestAppList(marathonValidLabel, 0), nil } - if err := md.updateServices(context.Background(), ch); err != nil { + tgs, err = md.refresh(context.Background()) + if err != nil { t.Fatalf("Got error on second update: %s", err) } - up2 := (<-ch)[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 targetgroup, got", len(tgs)) + } + tg2 := tgs[0] - if up2.Source != up1.Source { - t.Fatalf("Source is different: %s", up2) - if len(up2.Targets) > 0 { - t.Fatalf("Got a non-empty target set: %s", up2.Targets) + if tg2.Source != tg1.Source { + t.Fatalf("Source is different: %s != %s", tg1.Source, tg2.Source) + if len(tg2.Targets) > 0 { + t.Fatalf("Got a non-empty target set: %s", tg2.Targets) } } } -func TestMarathonSDRunAndStop(t *testing.T) { +func marathonTestAppListWithMultiplePorts(labels map[string]string, runningTasks int) *appList { var ( - refreshInterval = model.Duration(time.Millisecond * 10) - conf = SDConfig{Servers: testServers, RefreshInterval: refreshInterval} - ch = make(chan []*targetgroup.Group) - doneCh = make(chan error) - ) - md, err := NewDiscovery(conf, nil) - if err != nil { - t.Fatalf("%s", err) - } - md.appsClient = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { - return marathonTestAppList(marathonValidLabel, 1), nil - } - ctx, cancel := context.WithCancel(context.Background()) - - go func() { - md.Run(ctx, ch) - close(doneCh) - }() - - timeout := time.After(md.refreshInterval * 3) - for { - select { - case <-ch: - cancel() - case <-doneCh: - cancel() - return - case <-timeout: - t.Fatalf("Update took too long.") - } - } -} - -func marathonTestAppListWithMultiplePorts(labels map[string]string, runningTasks int) *AppList { - var ( - task = Task{ + t = task{ ID: "test-task-1", Host: "mesos-slave1", } - docker = DockerContainer{ + docker = dockerContainer{ Image: "repo/image:tag", } - portMappings = []PortMapping{ + portMappings = []portMapping{ {Labels: labels, HostPort: 31000}, {Labels: make(map[string]string), HostPort: 32000}, } - container = Container{Docker: docker, PortMappings: portMappings} - app = App{ + container = container{Docker: docker, PortMappings: portMappings} + a = app{ ID: "test-service", - Tasks: []Task{task}, + Tasks: []task{t}, RunningTasks: runningTasks, Labels: labels, Container: container, } ) - return &AppList{ - Apps: []App{app}, + return &appList{ + Apps: []app{a}, } } func TestMarathonSDSendGroupWithMultiplePort(t *testing.T) { var ( - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestAppListWithMultiplePorts(marathonValidLabel, 1), nil } ) - if err := testUpdateServices(client, ch); err != nil { + tgs, err := testUpdateServices(client) + if err != nil { t.Fatalf("Got error: %s", err) } - select { - case tgs := <-ch: - tg := tgs[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 target group, got", len(tgs)) + } + tg := tgs[0] - if tg.Source != "test-service" { - t.Fatalf("Wrong target group name: %s", tg.Source) - } - if len(tg.Targets) != 2 { - t.Fatalf("Wrong number of targets: %v", tg.Targets) - } - tgt := tg.Targets[0] - if tgt[model.AddressLabel] != "mesos-slave1:31000" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { - t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) - } - tgt = tg.Targets[1] - if tgt[model.AddressLabel] != "mesos-slave1:32000" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) - } - default: - t.Fatal("Did not get a target group.") + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 2 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "mesos-slave1:31000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + tgt = tg.Targets[1] + if tgt[model.AddressLabel] != "mesos-slave1:32000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) } } -func marathonTestZeroTaskPortAppList(labels map[string]string, runningTasks int) *AppList { +func marathonTestZeroTaskPortAppList(labels map[string]string, runningTasks int) *appList { var ( - task = Task{ + t = task{ ID: "test-task-2", Host: "mesos-slave-2", Ports: []uint32{}, } - docker = DockerContainer{Image: "repo/image:tag"} - container = Container{Docker: docker} - app = App{ + docker = dockerContainer{Image: "repo/image:tag"} + container = container{Docker: docker} + a = app{ ID: "test-service-zero-ports", - Tasks: []Task{task}, + Tasks: []task{t}, RunningTasks: runningTasks, Labels: labels, Container: container, } ) - return &AppList{ - Apps: []App{app}, + return &appList{ + Apps: []app{a}, } } func TestMarathonZeroTaskPorts(t *testing.T) { var ( - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestZeroTaskPortAppList(marathonValidLabel, 1), nil } ) - if err := testUpdateServices(client, ch); err != nil { + tgs, err := testUpdateServices(client) + if err != nil { t.Fatalf("Got error: %s", err) } - select { - case tgs := <-ch: - tg := tgs[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 target group, got", len(tgs)) + } + tg := tgs[0] - if tg.Source != "test-service-zero-ports" { - t.Fatalf("Wrong target group name: %s", tg.Source) - } - if len(tg.Targets) != 0 { - t.Fatalf("Wrong number of targets: %v", tg.Targets) - } - default: - t.Fatal("Did not get a target group.") + if tg.Source != "test-service-zero-ports" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 0 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) } } func Test500ErrorHttpResponseWithValidJSONBody(t *testing.T) { - var ( - ch = make(chan []*targetgroup.Group, 1) - client = fetchApps - ) // Simulate 500 error with a valid JSON response. respHandler := func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) @@ -333,232 +293,227 @@ func Test500ErrorHttpResponseWithValidJSONBody(t *testing.T) { // Setup conf for the test case. conf = SDConfig{Servers: []string{ts.URL}} // Execute test case and validate behavior. - if err := testUpdateServices(client, ch); err == nil { - t.Fatalf("Expected error for 5xx HTTP response from marathon server") + _, err := testUpdateServices(nil) + if err == nil { + t.Fatalf("Expected error for 5xx HTTP response from marathon server, got nil") } } -func marathonTestAppListWithPortDefinitions(labels map[string]string, runningTasks int) *AppList { +func marathonTestAppListWithPortDefinitions(labels map[string]string, runningTasks int) *appList { var ( - task = Task{ + t = task{ ID: "test-task-1", Host: "mesos-slave1", // Auto-generated ports when requirePorts is false Ports: []uint32{1234, 5678}, } - docker = DockerContainer{ + docker = dockerContainer{ Image: "repo/image:tag", } - container = Container{Docker: docker} - app = App{ + container = container{Docker: docker} + a = app{ ID: "test-service", - Tasks: []Task{task}, + Tasks: []task{t}, RunningTasks: runningTasks, Labels: labels, Container: container, - PortDefinitions: []PortDefinition{ + PortDefinitions: []portDefinition{ {Labels: make(map[string]string), Port: 31000}, {Labels: labels, Port: 32000}, }, RequirePorts: false, // default } ) - return &AppList{ - Apps: []App{app}, + return &appList{ + Apps: []app{a}, } } func TestMarathonSDSendGroupWithPortDefinitions(t *testing.T) { var ( - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestAppListWithPortDefinitions(marathonValidLabel, 1), nil } ) - if err := testUpdateServices(client, ch); err != nil { + tgs, err := testUpdateServices(client) + if err != nil { t.Fatalf("Got error: %s", err) } - select { - case tgs := <-ch: - tg := tgs[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 target group, got", len(tgs)) + } + tg := tgs[0] - if tg.Source != "test-service" { - t.Fatalf("Wrong target group name: %s", tg.Source) - } - if len(tg.Targets) != 2 { - t.Fatalf("Wrong number of targets: %v", tg.Targets) - } - tgt := tg.Targets[0] - if tgt[model.AddressLabel] != "mesos-slave1:1234" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) - } - tgt = tg.Targets[1] - if tgt[model.AddressLabel] != "mesos-slave1:5678" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "yes" { - t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) - } - default: - t.Fatal("Did not get a target group.") + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 2 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "mesos-slave1:1234" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) + } + tgt = tg.Targets[1] + if tgt[model.AddressLabel] != "mesos-slave1:5678" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) } } -func marathonTestAppListWithPortDefinitionsRequirePorts(labels map[string]string, runningTasks int) *AppList { +func marathonTestAppListWithPortDefinitionsRequirePorts(labels map[string]string, runningTasks int) *appList { var ( - task = Task{ + t = task{ ID: "test-task-1", Host: "mesos-slave1", Ports: []uint32{31000, 32000}, } - docker = DockerContainer{ + docker = dockerContainer{ Image: "repo/image:tag", } - container = Container{Docker: docker} - app = App{ + container = container{Docker: docker} + a = app{ ID: "test-service", - Tasks: []Task{task}, + Tasks: []task{t}, RunningTasks: runningTasks, Labels: labels, Container: container, - PortDefinitions: []PortDefinition{ + PortDefinitions: []portDefinition{ {Labels: make(map[string]string), Port: 31000}, {Labels: labels, Port: 32000}, }, RequirePorts: true, } ) - return &AppList{ - Apps: []App{app}, + return &appList{ + Apps: []app{a}, } } func TestMarathonSDSendGroupWithPortDefinitionsRequirePorts(t *testing.T) { var ( - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestAppListWithPortDefinitionsRequirePorts(marathonValidLabel, 1), nil } ) - if err := testUpdateServices(client, ch); err != nil { + tgs, err := testUpdateServices(client) + if err != nil { t.Fatalf("Got error: %s", err) } - select { - case tgs := <-ch: - tg := tgs[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 target group, got", len(tgs)) + } + tg := tgs[0] - if tg.Source != "test-service" { - t.Fatalf("Wrong target group name: %s", tg.Source) - } - if len(tg.Targets) != 2 { - t.Fatalf("Wrong number of targets: %v", tg.Targets) - } - tgt := tg.Targets[0] - if tgt[model.AddressLabel] != "mesos-slave1:31000" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) - } - tgt = tg.Targets[1] - if tgt[model.AddressLabel] != "mesos-slave1:32000" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "yes" { - t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) - } - default: - t.Fatal("Did not get a target group.") + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 2 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "mesos-slave1:31000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) + } + tgt = tg.Targets[1] + if tgt[model.AddressLabel] != "mesos-slave1:32000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) } } -func marathonTestAppListWithPorts(labels map[string]string, runningTasks int) *AppList { +func marathonTestAppListWithPorts(labels map[string]string, runningTasks int) *appList { var ( - task = Task{ + t = task{ ID: "test-task-1", Host: "mesos-slave1", Ports: []uint32{31000, 32000}, } - docker = DockerContainer{ + docker = dockerContainer{ Image: "repo/image:tag", } - container = Container{Docker: docker} - app = App{ + container = container{Docker: docker} + a = app{ ID: "test-service", - Tasks: []Task{task}, + Tasks: []task{t}, RunningTasks: runningTasks, Labels: labels, Container: container, } ) - return &AppList{ - Apps: []App{app}, + return &appList{ + Apps: []app{a}, } } func TestMarathonSDSendGroupWithPorts(t *testing.T) { var ( - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestAppListWithPorts(marathonValidLabel, 1), nil } ) - if err := testUpdateServices(client, ch); err != nil { + tgs, err := testUpdateServices(client) + if err != nil { t.Fatalf("Got error: %s", err) } - select { - case tgs := <-ch: - tg := tgs[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 target group, got", len(tgs)) + } + tg := tgs[0] - if tg.Source != "test-service" { - t.Fatalf("Wrong target group name: %s", tg.Source) - } - if len(tg.Targets) != 2 { - t.Fatalf("Wrong number of targets: %v", tg.Targets) - } - tgt := tg.Targets[0] - if tgt[model.AddressLabel] != "mesos-slave1:31000" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) - } - tgt = tg.Targets[1] - if tgt[model.AddressLabel] != "mesos-slave1:32000" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) - } - default: - t.Fatal("Did not get a target group.") + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 2 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "mesos-slave1:31000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) + } + tgt = tg.Targets[1] + if tgt[model.AddressLabel] != "mesos-slave1:32000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) } } -func marathonTestAppListWithContainerPortMappings(labels map[string]string, runningTasks int) *AppList { +func marathonTestAppListWithContainerPortMappings(labels map[string]string, runningTasks int) *appList { var ( - task = Task{ + t = task{ ID: "test-task-1", Host: "mesos-slave1", Ports: []uint32{ @@ -566,77 +521,75 @@ func marathonTestAppListWithContainerPortMappings(labels map[string]string, runn 32000, }, } - docker = DockerContainer{ + docker = dockerContainer{ Image: "repo/image:tag", } - container = Container{ + container = container{ Docker: docker, - PortMappings: []PortMapping{ + PortMappings: []portMapping{ {Labels: labels, HostPort: 0}, {Labels: make(map[string]string), HostPort: 32000}, }, } - app = App{ + a = app{ ID: "test-service", - Tasks: []Task{task}, + Tasks: []task{t}, RunningTasks: runningTasks, Labels: labels, Container: container, } ) - return &AppList{ - Apps: []App{app}, + return &appList{ + Apps: []app{a}, } } func TestMarathonSDSendGroupWithContainerPortMappings(t *testing.T) { var ( - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestAppListWithContainerPortMappings(marathonValidLabel, 1), nil } ) - if err := testUpdateServices(client, ch); err != nil { + tgs, err := testUpdateServices(client) + if err != nil { t.Fatalf("Got error: %s", err) } - select { - case tgs := <-ch: - tg := tgs[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 target group, got", len(tgs)) + } + tg := tgs[0] - if tg.Source != "test-service" { - t.Fatalf("Wrong target group name: %s", tg.Source) - } - if len(tg.Targets) != 2 { - t.Fatalf("Wrong number of targets: %v", tg.Targets) - } - tgt := tg.Targets[0] - if tgt[model.AddressLabel] != "mesos-slave1:12345" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { - t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) - } - tgt = tg.Targets[1] - if tgt[model.AddressLabel] != "mesos-slave1:32000" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) - } - default: - t.Fatal("Did not get a target group.") + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 2 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "mesos-slave1:12345" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) + } + tgt = tg.Targets[1] + if tgt[model.AddressLabel] != "mesos-slave1:32000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) } } -func marathonTestAppListWithDockerContainerPortMappings(labels map[string]string, runningTasks int) *AppList { +func marathonTestAppListWithDockerContainerPortMappings(labels map[string]string, runningTasks int) *appList { var ( - task = Task{ + t = task{ ID: "test-task-1", Host: "mesos-slave1", Ports: []uint32{ @@ -644,152 +597,148 @@ func marathonTestAppListWithDockerContainerPortMappings(labels map[string]string 12345, // 'Automatically-generated' port }, } - docker = DockerContainer{ + docker = dockerContainer{ Image: "repo/image:tag", - PortMappings: []PortMapping{ + PortMappings: []portMapping{ {Labels: labels, HostPort: 31000}, {Labels: make(map[string]string), HostPort: 0}, }, } - container = Container{ + container = container{ Docker: docker, } - app = App{ + a = app{ ID: "test-service", - Tasks: []Task{task}, + Tasks: []task{t}, RunningTasks: runningTasks, Labels: labels, Container: container, } ) - return &AppList{ - Apps: []App{app}, + return &appList{ + Apps: []app{a}, } } func TestMarathonSDSendGroupWithDockerContainerPortMappings(t *testing.T) { var ( - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestAppListWithDockerContainerPortMappings(marathonValidLabel, 1), nil } ) - if err := testUpdateServices(client, ch); err != nil { + tgs, err := testUpdateServices(client) + if err != nil { t.Fatalf("Got error: %s", err) } - select { - case tgs := <-ch: - tg := tgs[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 target group, got", len(tgs)) + } + tg := tgs[0] - if tg.Source != "test-service" { - t.Fatalf("Wrong target group name: %s", tg.Source) - } - if len(tg.Targets) != 2 { - t.Fatalf("Wrong number of targets: %v", tg.Targets) - } - tgt := tg.Targets[0] - if tgt[model.AddressLabel] != "mesos-slave1:31000" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { - t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) - } - tgt = tg.Targets[1] - if tgt[model.AddressLabel] != "mesos-slave1:12345" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) - } - default: - t.Fatal("Did not get a target group.") + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 2 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "mesos-slave1:31000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) + } + tgt = tg.Targets[1] + if tgt[model.AddressLabel] != "mesos-slave1:12345" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) } } -func marathonTestAppListWithContainerNetworkAndPortMappings(labels map[string]string, runningTasks int) *AppList { +func marathonTestAppListWithContainerNetworkAndPortMappings(labels map[string]string, runningTasks int) *appList { var ( - task = Task{ + t = task{ ID: "test-task-1", Host: "mesos-slave1", - IPAddresses: []IPAddress{ + IPAddresses: []ipAddress{ {Address: "1.2.3.4"}, }, } - docker = DockerContainer{ + docker = dockerContainer{ Image: "repo/image:tag", } - portMappings = []PortMapping{ + portMappings = []portMapping{ {Labels: labels, ContainerPort: 8080, HostPort: 31000}, {Labels: make(map[string]string), ContainerPort: 1234, HostPort: 32000}, } - container = Container{ + container = container{ Docker: docker, PortMappings: portMappings, } - networks = []Network{ + networks = []network{ {Mode: "container", Name: "test-network"}, } - app = App{ + a = app{ ID: "test-service", - Tasks: []Task{task}, + Tasks: []task{t}, RunningTasks: runningTasks, Labels: labels, Container: container, Networks: networks, } ) - return &AppList{ - Apps: []App{app}, + return &appList{ + Apps: []app{a}, } } func TestMarathonSDSendGroupWithContainerNetworkAndPortMapping(t *testing.T) { var ( - ch = make(chan []*targetgroup.Group, 1) - client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { + client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return marathonTestAppListWithContainerNetworkAndPortMappings(marathonValidLabel, 1), nil } ) - if err := testUpdateServices(client, ch); err != nil { + tgs, err := testUpdateServices(client) + if err != nil { t.Fatalf("Got error: %s", err) } - select { - case tgs := <-ch: - tg := tgs[0] + if len(tgs) != 1 { + t.Fatal("Expected 1 target group, got", len(tgs)) + } + tg := tgs[0] - if tg.Source != "test-service" { - t.Fatalf("Wrong target group name: %s", tg.Source) - } - if len(tg.Targets) != 2 { - t.Fatalf("Wrong number of targets: %v", tg.Targets) - } - tgt := tg.Targets[0] - if tgt[model.AddressLabel] != "1.2.3.4:8080" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { - t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) - } - tgt = tg.Targets[1] - if tgt[model.AddressLabel] != "1.2.3.4:1234" { - t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) - } - if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { - t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) - } - default: - t.Fatal("Did not get a target group.") + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 2 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "1.2.3.4:8080" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) + } + tgt = tg.Targets[1] + if tgt[model.AddressLabel] != "1.2.3.4:1234" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) } } diff --git a/discovery/openstack/hypervisor.go b/discovery/openstack/hypervisor.go index f996011299..af541f0c12 100644 --- a/discovery/openstack/hypervisor.go +++ b/discovery/openstack/hypervisor.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "net" - "time" "github.com/go-kit/kit/log" "github.com/gophercloud/gophercloud" @@ -45,25 +44,16 @@ type HypervisorDiscovery struct { port int } -// NewHypervisorDiscovery returns a new hypervisor discovery. -func NewHypervisorDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions, +// newHypervisorDiscovery returns a new hypervisor discovery. +func newHypervisorDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions, port int, region string, l log.Logger) *HypervisorDiscovery { return &HypervisorDiscovery{provider: provider, authOpts: opts, region: region, port: port, logger: l} } -func (h *HypervisorDiscovery) refresh(ctx context.Context) (*targetgroup.Group, error) { - var err error - t0 := time.Now() - defer func() { - refreshDuration.Observe(time.Since(t0).Seconds()) - if err != nil { - refreshFailuresCount.Inc() - } - }() - +func (h *HypervisorDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { h.provider.Context = ctx - err = openstack.Authenticate(h.provider, *h.authOpts) + err := openstack.Authenticate(h.provider, *h.authOpts) if err != nil { return nil, fmt.Errorf("could not authenticate to OpenStack: %s", err) } @@ -102,5 +92,5 @@ func (h *HypervisorDiscovery) refresh(ctx context.Context) (*targetgroup.Group, return nil, err } - return tg, nil + return []*targetgroup.Group{tg}, nil } diff --git a/discovery/openstack/hypervisor_test.go b/discovery/openstack/hypervisor_test.go index c50425228b..1e4463892a 100644 --- a/discovery/openstack/hypervisor_test.go +++ b/discovery/openstack/hypervisor_test.go @@ -40,7 +40,7 @@ func (s *OpenstackSDHypervisorTestSuite) SetupTest(t *testing.T) { s.Mock.HandleAuthSuccessfully() } -func (s *OpenstackSDHypervisorTestSuite) openstackAuthSuccess() (*Discovery, error) { +func (s *OpenstackSDHypervisorTestSuite) openstackAuthSuccess() (refresher, error) { conf := SDConfig{ IdentityEndpoint: s.Mock.Endpoint(), Password: "test", @@ -49,7 +49,7 @@ func (s *OpenstackSDHypervisorTestSuite) openstackAuthSuccess() (*Discovery, err Region: "RegionOne", Role: "hypervisor", } - return NewDiscovery(&conf, nil) + return newRefresher(&conf, nil) } func TestOpenstackSDHypervisorRefresh(t *testing.T) { @@ -59,7 +59,9 @@ func TestOpenstackSDHypervisorRefresh(t *testing.T) { hypervisor, _ := mock.openstackAuthSuccess() ctx := context.Background() - tg, err := hypervisor.r.refresh(ctx) + tgs, err := hypervisor.refresh(ctx) + testutil.Equals(t, 1, len(tgs)) + tg := tgs[0] testutil.Ok(t, err) testutil.Assert(t, tg != nil, "") testutil.Assert(t, tg.Targets != nil, "") @@ -89,7 +91,7 @@ func TestOpenstackSDHypervisorRefreshWithDoneContext(t *testing.T) { hypervisor, _ := mock.openstackAuthSuccess() ctx, cancel := context.WithCancel(context.Background()) cancel() - _, err := hypervisor.r.refresh(ctx) + _, err := hypervisor.refresh(ctx) testutil.NotOk(t, err, "") testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error()), "%q doesn't contain %q", err, context.Canceled) diff --git a/discovery/openstack/instance.go b/discovery/openstack/instance.go index 25017f893a..1025d6c2b9 100644 --- a/discovery/openstack/instance.go +++ b/discovery/openstack/instance.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "net" - "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -27,6 +26,7 @@ import ( "github.com/gophercloud/gophercloud/openstack/compute/v2/servers" "github.com/gophercloud/gophercloud/pagination" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/strutil" ) @@ -54,7 +54,7 @@ type InstanceDiscovery struct { } // NewInstanceDiscovery returns a new instance discovery. -func NewInstanceDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions, +func newInstanceDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions, port int, region string, allTenants bool, l log.Logger) *InstanceDiscovery { if l == nil { l = log.NewNopLogger() @@ -68,18 +68,9 @@ type floatingIPKey struct { fixed string } -func (i *InstanceDiscovery) refresh(ctx context.Context) (*targetgroup.Group, error) { - var err error - t0 := time.Now() - defer func() { - refreshDuration.Observe(time.Since(t0).Seconds()) - if err != nil { - refreshFailuresCount.Inc() - } - }() - +func (i *InstanceDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { i.provider.Context = ctx - err = openstack.Authenticate(i.provider, *i.authOpts) + err := openstack.Authenticate(i.provider, *i.authOpts) if err != nil { return nil, fmt.Errorf("could not authenticate to OpenStack: %s", err) } @@ -200,5 +191,5 @@ func (i *InstanceDiscovery) refresh(ctx context.Context) (*targetgroup.Group, er return nil, err } - return tg, nil + return []*targetgroup.Group{tg}, nil } diff --git a/discovery/openstack/instance_test.go b/discovery/openstack/instance_test.go index 6575acc8d1..d03cab76b5 100644 --- a/discovery/openstack/instance_test.go +++ b/discovery/openstack/instance_test.go @@ -42,7 +42,7 @@ func (s *OpenstackSDInstanceTestSuite) SetupTest(t *testing.T) { s.Mock.HandleAuthSuccessfully() } -func (s *OpenstackSDInstanceTestSuite) openstackAuthSuccess() (*Discovery, error) { +func (s *OpenstackSDInstanceTestSuite) openstackAuthSuccess() (refresher, error) { conf := SDConfig{ IdentityEndpoint: s.Mock.Endpoint(), Password: "test", @@ -52,7 +52,7 @@ func (s *OpenstackSDInstanceTestSuite) openstackAuthSuccess() (*Discovery, error Role: "instance", AllTenants: true, } - return NewDiscovery(&conf, nil) + return newRefresher(&conf, nil) } func TestOpenstackSDInstanceRefresh(t *testing.T) { @@ -64,9 +64,12 @@ func TestOpenstackSDInstanceRefresh(t *testing.T) { testutil.Ok(t, err) ctx := context.Background() - tg, err := instance.r.refresh(ctx) + tgs, err := instance.refresh(ctx) testutil.Ok(t, err) + testutil.Equals(t, 1, len(tgs)) + + tg := tgs[0] testutil.Assert(t, tg != nil, "") testutil.Assert(t, tg.Targets != nil, "") testutil.Equals(t, 4, len(tg.Targets)) @@ -128,7 +131,7 @@ func TestOpenstackSDInstanceRefreshWithDoneContext(t *testing.T) { hypervisor, _ := mock.openstackAuthSuccess() ctx, cancel := context.WithCancel(context.Background()) cancel() - _, err := hypervisor.r.refresh(ctx) + _, err := hypervisor.refresh(ctx) testutil.NotOk(t, err, "") testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error()), "%q doesn't contain %q", err, context.Canceled) diff --git a/discovery/openstack/openstack.go b/discovery/openstack/openstack.go index a91763ba5a..c1175d3f83 100644 --- a/discovery/openstack/openstack.go +++ b/discovery/openstack/openstack.go @@ -21,34 +21,21 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "github.com/gophercloud/gophercloud" "github.com/gophercloud/gophercloud/openstack" "github.com/mwitkow/go-conntrack" - "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/refresh" "github.com/prometheus/prometheus/discovery/targetgroup" ) -var ( - refreshFailuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_openstack_refresh_failures_total", - Help: "The number of OpenStack-SD scrape failures.", - }) - refreshDuration = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "prometheus_sd_openstack_refresh_duration_seconds", - Help: "The duration of an OpenStack-SD refresh in seconds.", - }) - // DefaultSDConfig is the default OpenStack SD configuration. - DefaultSDConfig = SDConfig{ - Port: 80, - RefreshInterval: model.Duration(60 * time.Second), - } -) +// DefaultSDConfig is the default OpenStack SD configuration. +var DefaultSDConfig = SDConfig{ + Port: 80, + RefreshInterval: model.Duration(60 * time.Second), +} // SDConfig is the configuration for OpenStack based service discovery. type SDConfig struct { @@ -71,7 +58,7 @@ type SDConfig struct { TLSConfig config_util.TLSConfig `yaml:"tls_config,omitempty"` } -// OpenStackRole is role of the target in OpenStack. +// Role is the role of the target in OpenStack. type Role string // The valid options for OpenStackRole. @@ -114,25 +101,26 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } -func init() { - prometheus.MustRegister(refreshFailuresCount) - prometheus.MustRegister(refreshDuration) -} - type refresher interface { - refresh(ctx context.Context) (tg *targetgroup.Group, err error) + refresh(context.Context) ([]*targetgroup.Group, error) } -// Discovery periodically performs OpenStack-SD requests. It implements -// the Discoverer interface. -type Discovery struct { - interval time.Duration - logger log.Logger - r refresher +// NewDiscovery returns a new OpenStack Discoverer which periodically refreshes its targets. +func NewDiscovery(conf *SDConfig, l log.Logger) (*refresh.Discovery, error) { + r, err := newRefresher(conf, l) + if err != nil { + return nil, err + } + return refresh.NewDiscovery( + l, + "openstack", + time.Duration(conf.RefreshInterval), + r.refresh, + ), nil + } -// NewDiscovery returns a new OpenStackDiscovery which periodically refreshes its targets. -func NewDiscovery(conf *SDConfig, l log.Logger) (*Discovery, error) { +func newRefresher(conf *SDConfig, l log.Logger) (refresher, error) { var opts gophercloud.AuthOptions if conf.IdentityEndpoint == "" { var err error @@ -174,51 +162,11 @@ func NewDiscovery(conf *SDConfig, l log.Logger) (*Discovery, error) { }, Timeout: 5 * time.Duration(conf.RefreshInterval), } - var r refresher switch conf.Role { case OpenStackRoleHypervisor: - r = NewHypervisorDiscovery(client, &opts, conf.Port, conf.Region, l) + return newHypervisorDiscovery(client, &opts, conf.Port, conf.Region, l), nil case OpenStackRoleInstance: - r = NewInstanceDiscovery(client, &opts, conf.Port, conf.Region, conf.AllTenants, l) - default: - return nil, errors.New("unknown OpenStack discovery role") - } - return &Discovery{r: r, logger: l, interval: time.Duration(conf.RefreshInterval)}, nil -} - -// Run implements the Discoverer interface. -func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - // Get an initial set right away. - tg, err := d.r.refresh(ctx) - if err != nil { - level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error()) - } else { - select { - case ch <- []*targetgroup.Group{tg}: - case <-ctx.Done(): - return - } - } - - ticker := time.NewTicker(d.interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - tg, err := d.r.refresh(ctx) - if err != nil { - level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error()) - continue - } - - select { - case ch <- []*targetgroup.Group{tg}: - case <-ctx.Done(): - return - } - case <-ctx.Done(): - return - } + return newInstanceDiscovery(client, &opts, conf.Port, conf.Region, conf.AllTenants, l), nil } + return nil, errors.New("unknown OpenStack discovery role") } diff --git a/discovery/refresh/refresh.go b/discovery/refresh/refresh.go new file mode 100644 index 0000000000..4e53e258c2 --- /dev/null +++ b/discovery/refresh/refresh.go @@ -0,0 +1,117 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package refresh + +import ( + "context" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +var ( + failuresCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "prometheus_sd_refresh_failures_total", + Help: "Number of refresh failures for the given SD mechanism.", + }, + []string{"mechanism"}, + ) + duration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Name: "prometheus_sd_refresh_duration_seconds", + Help: "The duration of a refresh in seconds for the given SD mechanism.", + }, + []string{"mechanism"}, + ) +) + +func init() { + prometheus.MustRegister(duration, failuresCount) +} + +// Discovery implements the Discoverer interface. +type Discovery struct { + logger log.Logger + interval time.Duration + refreshf func(ctx context.Context) ([]*targetgroup.Group, error) + + failures prometheus.Counter + duration prometheus.Observer +} + +// NewDiscovery returns a Discoverer function that calls a refresh() function at every interval. +func NewDiscovery(l log.Logger, mech string, interval time.Duration, refreshf func(ctx context.Context) ([]*targetgroup.Group, error)) *Discovery { + if l == nil { + l = log.NewNopLogger() + } + return &Discovery{ + logger: l, + interval: interval, + refreshf: refreshf, + failures: failuresCount.WithLabelValues(mech), + duration: duration.WithLabelValues(mech), + } +} + +// Run implements the Discoverer interface. +func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + // Get an initial set right away. + tgs, err := d.refresh(ctx) + if err != nil { + level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error()) + } else { + select { + case ch <- tgs: + case <-ctx.Done(): + return + } + } + + ticker := time.NewTicker(d.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + tgs, err := d.refresh(ctx) + if err != nil { + level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error()) + continue + } + + select { + case ch <- tgs: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } +} + +func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { + now := time.Now() + defer d.duration.Observe(time.Since(now).Seconds()) + tgs, err := d.refreshf(ctx) + if err != nil { + d.failures.Inc() + } + return tgs, err +} diff --git a/discovery/refresh/refresh_test.go b/discovery/refresh/refresh_test.go new file mode 100644 index 0000000000..066f901cea --- /dev/null +++ b/discovery/refresh/refresh_test.go @@ -0,0 +1,83 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package refresh + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/util/testutil" +) + +func TestRefresh(t *testing.T) { + tg1 := []*targetgroup.Group{ + { + Source: "tg", + Targets: []model.LabelSet{ + { + model.LabelName("t1"): model.LabelValue("v1"), + }, + { + model.LabelName("t2"): model.LabelValue("v2"), + }, + }, + Labels: model.LabelSet{ + model.LabelName("l1"): model.LabelValue("lv1"), + }, + }, + } + tg2 := []*targetgroup.Group{ + { + Source: "tg", + }, + } + + var i int + refresh := func(ctx context.Context) ([]*targetgroup.Group, error) { + i++ + switch i { + case 1: + return tg1, nil + case 2: + return tg2, nil + } + return nil, fmt.Errorf("some error") + } + interval := time.Millisecond + d := NewDiscovery(nil, "test", interval, refresh) + + ch := make(chan []*targetgroup.Group) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go d.Run(ctx, ch) + + tg := <-ch + testutil.Equals(t, tg1, tg) + + tg = <-ch + testutil.Equals(t, tg2, tg) + + tick := time.NewTicker(2 * interval) + defer tick.Stop() + select { + case <-ch: + t.Fatal("Unexpected target group") + case <-tick.C: + } +} diff --git a/discovery/triton/triton.go b/discovery/triton/triton.go index b530a8e419..57b1cfc4bf 100644 --- a/discovery/triton/triton.go +++ b/discovery/triton/triton.go @@ -24,12 +24,11 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "github.com/mwitkow/go-conntrack" - "github.com/prometheus/client_golang/prometheus" + config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" - config_util "github.com/prometheus/common/config" + "github.com/prometheus/prometheus/discovery/refresh" "github.com/prometheus/prometheus/discovery/targetgroup" ) @@ -43,24 +42,12 @@ const ( tritonLabelServerID = tritonLabel + "server_id" ) -var ( - refreshFailuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_triton_refresh_failures_total", - Help: "The number of Triton-SD scrape failures.", - }) - refreshDuration = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "prometheus_sd_triton_refresh_duration_seconds", - Help: "The duration of a Triton-SD refresh in seconds.", - }) - // DefaultSDConfig is the default Triton SD configuration. - DefaultSDConfig = SDConfig{ - Port: 9163, - RefreshInterval: model.Duration(60 * time.Second), - Version: 1, - } -) +// DefaultSDConfig is the default Triton SD configuration. +var DefaultSDConfig = SDConfig{ + Port: 9163, + RefreshInterval: model.Duration(60 * time.Second), + Version: 1, +} // SDConfig is the configuration for Triton based service discovery. type SDConfig struct { @@ -97,13 +84,8 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } -func init() { - prometheus.MustRegister(refreshFailuresCount) - prometheus.MustRegister(refreshDuration) -} - // DiscoveryResponse models a JSON response from the Triton discovery. -type DiscoveryResponse struct { +type discoveryResponse struct { Containers []struct { Groups []string `json:"groups"` ServerUUID string `json:"server_uuid"` @@ -117,18 +99,14 @@ type DiscoveryResponse struct { // Discovery periodically performs Triton-SD requests. It implements // the Discoverer interface. type Discovery struct { + *refresh.Discovery client *http.Client interval time.Duration - logger log.Logger sdConfig *SDConfig } // New returns a new Discovery which periodically refreshes its targets. func New(logger log.Logger, conf *SDConfig) (*Discovery, error) { - if logger == nil { - logger = log.NewNopLogger() - } - tls, err := config_util.NewTLSConfig(&conf.TLSConfig) if err != nil { return nil, err @@ -143,60 +121,28 @@ func New(logger log.Logger, conf *SDConfig) (*Discovery, error) { } client := &http.Client{Transport: transport} - return &Discovery{ + d := &Discovery{ client: client, interval: time.Duration(conf.RefreshInterval), - logger: logger, sdConfig: conf, - }, nil + } + d.Discovery = refresh.NewDiscovery( + logger, + "triton", + time.Duration(conf.RefreshInterval), + d.refresh, + ) + return d, nil } -// Run implements the Discoverer interface. -func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - defer close(ch) - - ticker := time.NewTicker(d.interval) - defer ticker.Stop() - - // Get an initial set right away. - tg, err := d.refresh(ctx) - if err != nil { - level.Error(d.logger).Log("msg", "Refreshing targets failed", "err", err) - } else { - ch <- []*targetgroup.Group{tg} - } - - for { - select { - case <-ticker.C: - tg, err := d.refresh(ctx) - if err != nil { - level.Error(d.logger).Log("msg", "Refreshing targets failed", "err", err) - } else { - ch <- []*targetgroup.Group{tg} - } - case <-ctx.Done(): - return - } - } -} - -func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err error) { - t0 := time.Now() - defer func() { - refreshDuration.Observe(time.Since(t0).Seconds()) - if err != nil { - refreshFailuresCount.Inc() - } - }() - +func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { var endpoint = fmt.Sprintf("https://%s:%d/v%d/discover", d.sdConfig.Endpoint, d.sdConfig.Port, d.sdConfig.Version) if len(d.sdConfig.Groups) > 0 { groups := url.QueryEscape(strings.Join(d.sdConfig.Groups, ",")) endpoint = fmt.Sprintf("%s?groups=%s", endpoint, groups) } - tg = &targetgroup.Group{ + tg := &targetgroup.Group{ Source: endpoint, } @@ -217,7 +163,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err return nil, fmt.Errorf("an error occurred when reading the response body: %s", err) } - dr := DiscoveryResponse{} + dr := discoveryResponse{} err = json.Unmarshal(data, &dr) if err != nil { return nil, fmt.Errorf("an error occurred unmarshaling the discovery response json: %s", err) @@ -242,5 +188,5 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err tg.Targets = append(tg.Targets, labels) } - return tg, nil + return []*targetgroup.Group{tg}, nil } diff --git a/discovery/triton/triton_test.go b/discovery/triton/triton_test.go index 7b78c451c3..3050d36ff2 100644 --- a/discovery/triton/triton_test.go +++ b/discovery/triton/triton_test.go @@ -23,11 +23,10 @@ import ( "strconv" "strings" "testing" - "time" "github.com/prometheus/common/config" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/util/testutil" ) @@ -104,30 +103,6 @@ func TestTritonSDNewGroupsConfig(t *testing.T) { testutil.Equals(t, groupsconf.Port, td.sdConfig.Port) } -func TestTritonSDRun(t *testing.T) { - var ( - td, _ = newTritonDiscovery(conf) - ch = make(chan []*targetgroup.Group) - ctx, cancel = context.WithCancel(context.Background()) - ) - - wait := make(chan struct{}) - go func() { - td.Run(ctx, ch) - close(wait) - }() - - select { - case <-time.After(60 * time.Millisecond): - // Expected. - case tgs := <-ch: - t.Fatalf("Unexpected target groups in triton discovery: %s", tgs) - } - - cancel() - <-wait -} - func TestTritonSDRefreshNoTargets(t *testing.T) { tgts := testTritonSDRefresh(t, "{\"containers\":[]}") testutil.Assert(t, tgts == nil, "") @@ -206,8 +181,10 @@ func testTritonSDRefresh(t *testing.T, dstr string) []model.LabelSet { td.sdConfig.Port = port - tg, err := td.refresh(context.Background()) + tgs, err := td.refresh(context.Background()) testutil.Ok(t, err) + testutil.Equals(t, 1, len(tgs)) + tg := tgs[0] testutil.Assert(t, tg != nil, "") return tg.Targets