discovery: factorize for SD based on refresh (#5381)

* discovery: factorize for SD based on refresh

Signed-off-by: Simon Pasquier <spasquie@redhat.com>

* discovery: use common metrics for refresh

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2019-03-25 11:54:22 +01:00 committed by GitHub
parent b95f4337a8
commit 782d00059a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 816 additions and 1027 deletions

View file

@ -29,9 +29,10 @@ import (
"github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/go-autorest/autorest/azure"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
) )
@ -53,26 +54,13 @@ const (
authMethodManagedIdentity = "ManagedIdentity" authMethodManagedIdentity = "ManagedIdentity"
) )
var ( // DefaultSDConfig is the default Azure SD configuration.
azureSDRefreshFailuresCount = prometheus.NewCounter( var DefaultSDConfig = SDConfig{
prometheus.CounterOpts{
Name: "prometheus_sd_azure_refresh_failures_total",
Help: "Number of Azure-SD refresh failures.",
})
azureSDRefreshDuration = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "prometheus_sd_azure_refresh_duration_seconds",
Help: "The duration of a Azure-SD refresh in seconds.",
})
// DefaultSDConfig is the default Azure SD configuration.
DefaultSDConfig = SDConfig{
Port: 80, Port: 80,
RefreshInterval: model.Duration(5 * time.Minute), RefreshInterval: model.Duration(5 * time.Minute),
Environment: azure.PublicCloud.Name, Environment: azure.PublicCloud.Name,
AuthenticationMethod: authMethodOAuth, AuthenticationMethod: authMethodOAuth,
} }
)
// SDConfig is the configuration for Azure based service discovery. // SDConfig is the configuration for Azure based service discovery.
type SDConfig struct { type SDConfig struct {
@ -125,18 +113,11 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil return nil
} }
func init() {
prometheus.MustRegister(azureSDRefreshDuration)
prometheus.MustRegister(azureSDRefreshFailuresCount)
}
// Discovery periodically performs Azure-SD requests. It implements
// the Discoverer interface.
type Discovery struct { type Discovery struct {
cfg *SDConfig *refresh.Discovery
interval time.Duration
port int
logger log.Logger logger log.Logger
cfg *SDConfig
port int
} }
// NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets. // NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets.
@ -144,42 +125,18 @@ func NewDiscovery(cfg *SDConfig, logger log.Logger) *Discovery {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
return &Discovery{ d := &Discovery{
cfg: cfg, cfg: cfg,
interval: time.Duration(cfg.RefreshInterval),
port: cfg.Port, port: cfg.Port,
logger: logger, logger: logger,
} }
} d.Discovery = refresh.NewDiscovery(
logger,
// Run implements the Discoverer interface. "azure",
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { time.Duration(cfg.RefreshInterval),
ticker := time.NewTicker(d.interval) d.refresh,
defer ticker.Stop() )
return d
for {
select {
case <-ctx.Done():
return
default:
}
tg, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Unable to refresh during Azure discovery", "err", err)
} else {
select {
case <-ctx.Done():
case ch <- []*targetgroup.Group{tg}:
}
}
select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
} }
// azureClient represents multiple Azure Resource Manager providers. // azureClient represents multiple Azure Resource Manager providers.
@ -281,17 +238,9 @@ func newAzureResourceFromID(id string, logger log.Logger) (azureResource, error)
}, nil }, nil
} }
func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err error) { func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
defer level.Debug(d.logger).Log("msg", "Azure discovery completed") defer level.Debug(d.logger).Log("msg", "Azure discovery completed")
t0 := time.Now()
defer func() {
azureSDRefreshDuration.Observe(time.Since(t0).Seconds())
if err != nil {
azureSDRefreshFailuresCount.Inc()
}
}()
tg = &targetgroup.Group{}
client, err := createAzureClient(*d.cfg) client, err := createAzureClient(*d.cfg)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not create Azure client: %s", err) return nil, fmt.Errorf("could not create Azure client: %s", err)
@ -405,6 +354,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err
wg.Wait() wg.Wait()
close(ch) close(ch)
var tg targetgroup.Group
for tgt := range ch { for tgt := range ch {
if tgt.err != nil { if tgt.err != nil {
return nil, fmt.Errorf("unable to complete Azure service discovery: %s", err) return nil, fmt.Errorf("unable to complete Azure service discovery: %s", err)
@ -414,7 +364,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err
} }
} }
return tg, nil return []*targetgroup.Group{&tg}, nil
} }
func (client *azureClient) getVMs(ctx context.Context) ([]virtualMachine, error) { func (client *azureClient) getVMs(ctx context.Context) ([]virtualMachine, error) {

View file

@ -327,7 +327,6 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
<-ticker.C <-ticker.C
} }
} }
} else { } else {
// We only have fully defined services. // We only have fully defined services.
for _, name := range d.watchedServices { for _, name := range d.watchedServices {

View file

@ -26,6 +26,7 @@ import (
"github.com/miekg/dns" "github.com/miekg/dns"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
) )
@ -98,9 +99,8 @@ func init() {
// Discovery periodically performs DNS-SD requests. It implements // Discovery periodically performs DNS-SD requests. It implements
// the Discoverer interface. // the Discoverer interface.
type Discovery struct { type Discovery struct {
*refresh.Discovery
names []string names []string
interval time.Duration
port int port int
qtype uint16 qtype uint16
logger log.Logger logger log.Logger
@ -121,50 +121,50 @@ func NewDiscovery(conf SDConfig, logger log.Logger) *Discovery {
case "SRV": case "SRV":
qtype = dns.TypeSRV qtype = dns.TypeSRV
} }
return &Discovery{ d := &Discovery{
names: conf.Names, names: conf.Names,
interval: time.Duration(conf.RefreshInterval),
qtype: qtype, qtype: qtype,
port: conf.Port, port: conf.Port,
logger: logger, logger: logger,
} }
d.Discovery = refresh.NewDiscovery(
logger,
"dns",
time.Duration(conf.RefreshInterval),
d.refresh,
)
return d
} }
// Run implements the Discoverer interface. func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { var (
ticker := time.NewTicker(d.interval) wg sync.WaitGroup
defer ticker.Stop() ch = make(chan *targetgroup.Group)
tgs = make([]*targetgroup.Group, len(d.names))
// Get an initial set right away. )
d.refreshAll(ctx, ch)
for {
select {
case <-ticker.C:
d.refreshAll(ctx, ch)
case <-ctx.Done():
return
}
}
}
func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*targetgroup.Group) {
var wg sync.WaitGroup
wg.Add(len(d.names)) wg.Add(len(d.names))
for _, name := range d.names { for _, name := range d.names {
go func(n string) { go func(n string) {
if err := d.refresh(ctx, n, ch); err != nil { if err := d.refreshOne(ctx, n, ch); err != nil {
level.Error(d.logger).Log("msg", "Error refreshing DNS targets", "err", err) level.Error(d.logger).Log("msg", "Error refreshing DNS targets", "err", err)
} }
wg.Done() wg.Done()
}(name) }(name)
} }
go func() {
wg.Wait() wg.Wait()
close(ch)
}()
for tg := range ch {
tgs = append(tgs, tg)
}
return tgs, nil
} }
func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*targetgroup.Group) error { func (d *Discovery) refreshOne(ctx context.Context, name string, ch chan<- *targetgroup.Group) error {
response, err := lookupWithSearchPath(name, d.qtype, d.logger) response, err := lookupWithSearchPath(name, d.qtype, d.logger)
dnsSDLookupsCount.Inc() dnsSDLookupsCount.Inc()
if err != nil { if err != nil {
@ -203,7 +203,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*targe
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case ch <- []*targetgroup.Group{tg}: case ch <- tg:
} }
return nil return nil

View file

@ -26,12 +26,11 @@ import (
"github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2"
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
) )
@ -55,23 +54,11 @@ const (
subnetSeparator = "," subnetSeparator = ","
) )
var ( // DefaultSDConfig is the default EC2 SD configuration.
ec2SDRefreshFailuresCount = prometheus.NewCounter( var DefaultSDConfig = SDConfig{
prometheus.CounterOpts{
Name: "prometheus_sd_ec2_refresh_failures_total",
Help: "The number of EC2-SD scrape failures.",
})
ec2SDRefreshDuration = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "prometheus_sd_ec2_refresh_duration_seconds",
Help: "The duration of a EC2-SD refresh in seconds.",
})
// DefaultSDConfig is the default EC2 SD configuration.
DefaultSDConfig = SDConfig{
Port: 80, Port: 80,
RefreshInterval: model.Duration(60 * time.Second), RefreshInterval: model.Duration(60 * time.Second),
} }
)
// Filter is the configuration for filtering EC2 instances. // Filter is the configuration for filtering EC2 instances.
type Filter struct { type Filter struct {
@ -120,21 +107,16 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil return nil
} }
func init() {
prometheus.MustRegister(ec2SDRefreshFailuresCount)
prometheus.MustRegister(ec2SDRefreshDuration)
}
// Discovery periodically performs EC2-SD requests. It implements // Discovery periodically performs EC2-SD requests. It implements
// the Discoverer interface. // the Discoverer interface.
type Discovery struct { type Discovery struct {
*refresh.Discovery
aws *aws.Config aws *aws.Config
interval time.Duration interval time.Duration
profile string profile string
roleARN string roleARN string
port int port int
filters []*Filter filters []*Filter
logger log.Logger
} }
// NewDiscovery returns a new EC2Discovery which periodically refreshes its targets. // NewDiscovery returns a new EC2Discovery which periodically refreshes its targets.
@ -146,7 +128,7 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) *Discovery {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
return &Discovery{ d := &Discovery{
aws: &aws.Config{ aws: &aws.Config{
Endpoint: &conf.Endpoint, Endpoint: &conf.Endpoint,
Region: &conf.Region, Region: &conf.Region,
@ -157,56 +139,17 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) *Discovery {
filters: conf.Filters, filters: conf.Filters,
interval: time.Duration(conf.RefreshInterval), interval: time.Duration(conf.RefreshInterval),
port: conf.Port, port: conf.Port,
logger: logger,
} }
d.Discovery = refresh.NewDiscovery(
logger,
"ec2",
time.Duration(conf.RefreshInterval),
d.refresh,
)
return d
} }
// Run implements the Discoverer interface. func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
ticker := time.NewTicker(d.interval)
defer ticker.Stop()
// Get an initial set right away.
tg, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Refresh failed", "err", err)
} else {
select {
case ch <- []*targetgroup.Group{tg}:
case <-ctx.Done():
return
}
}
for {
select {
case <-ticker.C:
tg, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Refresh failed", "err", err)
continue
}
select {
case ch <- []*targetgroup.Group{tg}:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err error) {
t0 := time.Now()
defer func() {
ec2SDRefreshDuration.Observe(time.Since(t0).Seconds())
if err != nil {
ec2SDRefreshFailuresCount.Inc()
}
}()
sess, err := session.NewSessionWithOptions(session.Options{ sess, err := session.NewSessionWithOptions(session.Options{
Config: *d.aws, Config: *d.aws,
Profile: d.profile, Profile: d.profile,
@ -222,7 +165,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err
} else { } else {
ec2s = ec2.New(sess) ec2s = ec2.New(sess)
} }
tg = &targetgroup.Group{ tg := &targetgroup.Group{
Source: *d.aws.Region, Source: *d.aws.Region,
} }
@ -306,5 +249,5 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err
}); err != nil { }); err != nil {
return nil, fmt.Errorf("could not describe instances: %s", err) return nil, fmt.Errorf("could not describe instances: %s", err)
} }
return tg, nil return []*targetgroup.Group{tg}, nil
} }

View file

@ -22,12 +22,11 @@ import (
"time" "time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/oauth2/google" "golang.org/x/oauth2/google"
compute "google.golang.org/api/compute/v1" compute "google.golang.org/api/compute/v1"
"github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
) )
@ -49,24 +48,12 @@ const (
gceLabelMachineType = gceLabel + "machine_type" gceLabelMachineType = gceLabel + "machine_type"
) )
var ( // DefaultSDConfig is the default GCE SD configuration.
gceSDRefreshFailuresCount = prometheus.NewCounter( var DefaultSDConfig = SDConfig{
prometheus.CounterOpts{
Name: "prometheus_sd_gce_refresh_failures_total",
Help: "The number of GCE-SD refresh failures.",
})
gceSDRefreshDuration = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "prometheus_sd_gce_refresh_duration",
Help: "The duration of a GCE-SD refresh in seconds.",
})
// DefaultSDConfig is the default GCE SD configuration.
DefaultSDConfig = SDConfig{
Port: 80, Port: 80,
TagSeparator: ",", TagSeparator: ",",
RefreshInterval: model.Duration(60 * time.Second), RefreshInterval: model.Duration(60 * time.Second),
} }
)
// SDConfig is the configuration for GCE based service discovery. // SDConfig is the configuration for GCE based service discovery.
type SDConfig struct { type SDConfig struct {
@ -104,97 +91,51 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil return nil
} }
func init() {
prometheus.MustRegister(gceSDRefreshFailuresCount)
prometheus.MustRegister(gceSDRefreshDuration)
}
// Discovery periodically performs GCE-SD requests. It implements // Discovery periodically performs GCE-SD requests. It implements
// the Discoverer interface. // the Discoverer interface.
type Discovery struct { type Discovery struct {
*refresh.Discovery
project string project string
zone string zone string
filter string filter string
client *http.Client client *http.Client
svc *compute.Service svc *compute.Service
isvc *compute.InstancesService isvc *compute.InstancesService
interval time.Duration
port int port int
tagSeparator string tagSeparator string
logger log.Logger
} }
// NewDiscovery returns a new Discovery which periodically refreshes its targets. // NewDiscovery returns a new Discovery which periodically refreshes its targets.
func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) { func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) {
if logger == nil { d := &Discovery{
logger = log.NewNopLogger()
}
gd := &Discovery{
project: conf.Project, project: conf.Project,
zone: conf.Zone, zone: conf.Zone,
filter: conf.Filter, filter: conf.Filter,
interval: time.Duration(conf.RefreshInterval),
port: conf.Port, port: conf.Port,
tagSeparator: conf.TagSeparator, tagSeparator: conf.TagSeparator,
logger: logger,
} }
var err error var err error
gd.client, err = google.DefaultClient(context.Background(), compute.ComputeReadonlyScope) d.client, err = google.DefaultClient(context.Background(), compute.ComputeReadonlyScope)
if err != nil { if err != nil {
return nil, fmt.Errorf("error setting up communication with GCE service: %s", err) return nil, fmt.Errorf("error setting up communication with GCE service: %s", err)
} }
gd.svc, err = compute.New(gd.client) d.svc, err = compute.New(d.client)
if err != nil { if err != nil {
return nil, fmt.Errorf("error setting up communication with GCE service: %s", err) return nil, fmt.Errorf("error setting up communication with GCE service: %s", err)
} }
gd.isvc = compute.NewInstancesService(gd.svc) d.isvc = compute.NewInstancesService(d.svc)
return gd, nil
d.Discovery = refresh.NewDiscovery(
logger,
"gce",
time.Duration(conf.RefreshInterval),
d.refresh,
)
return d, nil
} }
// Run implements the Discoverer interface. func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { tg := &targetgroup.Group{
// Get an initial set right away.
tg, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Refresh failed", "err", err)
} else {
select {
case ch <- []*targetgroup.Group{tg}:
case <-ctx.Done():
}
}
ticker := time.NewTicker(d.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tg, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Refresh failed", "err", err)
continue
}
select {
case ch <- []*targetgroup.Group{tg}:
case <-ctx.Done():
}
case <-ctx.Done():
return
}
}
}
func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err error) {
t0 := time.Now()
defer func() {
gceSDRefreshDuration.Observe(time.Since(t0).Seconds())
if err != nil {
gceSDRefreshFailuresCount.Inc()
}
}()
tg = &targetgroup.Group{
Source: fmt.Sprintf("GCE_%s_%s", d.project, d.zone), Source: fmt.Sprintf("GCE_%s_%s", d.project, d.zone),
} }
@ -202,7 +143,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err
if len(d.filter) > 0 { if len(d.filter) > 0 {
ilc = ilc.Filter(d.filter) ilc = ilc.Filter(d.filter)
} }
err = ilc.Pages(ctx, func(l *compute.InstanceList) error { err := ilc.Pages(ctx, func(l *compute.InstanceList) error {
for _, inst := range l.Items { for _, inst := range l.Items {
if len(inst.NetworkInterfaces) == 0 { if len(inst.NetworkInterfaces) == 0 {
continue continue
@ -259,7 +200,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err
return nil return nil
}) })
if err != nil { if err != nil {
return tg, fmt.Errorf("error retrieving refresh targets from gce: %s", err) return nil, fmt.Errorf("error retrieving refresh targets from gce: %s", err)
} }
return tg, nil return []*targetgroup.Group{tg}, nil
} }

View file

@ -26,10 +26,10 @@ import (
"time" "time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
) )
@ -54,29 +54,12 @@ const (
portMappingLabelPrefix = metaLabelPrefix + "port_mapping_label_" portMappingLabelPrefix = metaLabelPrefix + "port_mapping_label_"
// portDefinitionLabelPrefix is the prefix for the application portDefinitions labels. // portDefinitionLabelPrefix is the prefix for the application portDefinitions labels.
portDefinitionLabelPrefix = metaLabelPrefix + "port_definition_label_" portDefinitionLabelPrefix = metaLabelPrefix + "port_definition_label_"
// Constants for instrumentation.
namespace = "prometheus"
) )
var ( // DefaultSDConfig is the default Marathon SD configuration.
refreshFailuresCount = prometheus.NewCounter( var DefaultSDConfig = SDConfig{
prometheus.CounterOpts{
Namespace: namespace,
Name: "sd_marathon_refresh_failures_total",
Help: "The number of Marathon-SD refresh failures.",
})
refreshDuration = prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "sd_marathon_refresh_duration_seconds",
Help: "The duration of a Marathon-SD refresh in seconds.",
})
// DefaultSDConfig is the default Marathon SD configuration.
DefaultSDConfig = SDConfig{
RefreshInterval: model.Duration(30 * time.Second), RefreshInterval: model.Duration(30 * time.Second),
} }
)
// SDConfig is the configuration for services running on Marathon. // SDConfig is the configuration for services running on Marathon.
type SDConfig struct { type SDConfig struct {
@ -110,29 +93,19 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return c.HTTPClientConfig.Validate() return c.HTTPClientConfig.Validate()
} }
func init() {
prometheus.MustRegister(refreshFailuresCount)
prometheus.MustRegister(refreshDuration)
}
const appListPath string = "/v2/apps/?embed=apps.tasks" const appListPath string = "/v2/apps/?embed=apps.tasks"
// Discovery provides service discovery based on a Marathon instance. // Discovery provides service discovery based on a Marathon instance.
type Discovery struct { type Discovery struct {
*refresh.Discovery
client *http.Client client *http.Client
servers []string servers []string
refreshInterval time.Duration
lastRefresh map[string]*targetgroup.Group lastRefresh map[string]*targetgroup.Group
appsClient AppListClient appsClient appListClient
logger log.Logger
} }
// NewDiscovery returns a new Marathon Discovery. // NewDiscovery returns a new Marathon Discovery.
func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) { func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) {
if logger == nil {
logger = log.NewNopLogger()
}
rt, err := config_util.NewRoundTripperFromConfig(conf.HTTPClientConfig, "marathon_sd") rt, err := config_util.NewRoundTripperFromConfig(conf.HTTPClientConfig, "marathon_sd")
if err != nil { if err != nil {
return nil, err return nil, err
@ -147,13 +120,18 @@ func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) {
return nil, err return nil, err
} }
return &Discovery{ d := &Discovery{
client: &http.Client{Transport: rt}, client: &http.Client{Transport: rt},
servers: conf.Servers, servers: conf.Servers,
refreshInterval: time.Duration(conf.RefreshInterval),
appsClient: fetchApps, appsClient: fetchApps,
logger: logger, }
}, nil d.Discovery = refresh.NewDiscovery(
logger,
"marathon",
time.Duration(conf.RefreshInterval),
d.refresh,
)
return d, nil
} }
type authTokenRoundTripper struct { type authTokenRoundTripper struct {
@ -204,33 +182,10 @@ func (rt *authTokenFileRoundTripper) RoundTrip(request *http.Request) (*http.Res
return rt.rt.RoundTrip(request) return rt.rt.RoundTrip(request)
} }
// Run implements the Discoverer interface. func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
for {
select {
case <-ctx.Done():
return
case <-time.After(d.refreshInterval):
err := d.updateServices(ctx, ch)
if err != nil {
level.Error(d.logger).Log("msg", "Error while updating services", "err", err)
}
}
}
}
func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*targetgroup.Group) (err error) {
t0 := time.Now()
defer func() {
refreshDuration.Observe(time.Since(t0).Seconds())
if err != nil {
refreshFailuresCount.Inc()
}
}()
targetMap, err := d.fetchTargetGroups(ctx) targetMap, err := d.fetchTargetGroups(ctx)
if err != nil { if err != nil {
return err return nil, err
} }
all := make([]*targetgroup.Group, 0, len(targetMap)) all := make([]*targetgroup.Group, 0, len(targetMap))
@ -240,54 +195,49 @@ func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*targetgroup
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return nil, ctx.Err()
case ch <- all: default:
} }
// Remove services which did disappear. // Remove services which did disappear.
for source := range d.lastRefresh { for source := range d.lastRefresh {
_, ok := targetMap[source] _, ok := targetMap[source]
if !ok { if !ok {
select { all = append(all, &targetgroup.Group{Source: source})
case <-ctx.Done():
return ctx.Err()
case ch <- []*targetgroup.Group{{Source: source}}:
level.Debug(d.logger).Log("msg", "Removing group", "source", source)
}
} }
} }
d.lastRefresh = targetMap d.lastRefresh = targetMap
return nil return all, nil
} }
func (d *Discovery) fetchTargetGroups(ctx context.Context) (map[string]*targetgroup.Group, error) { func (d *Discovery) fetchTargetGroups(ctx context.Context) (map[string]*targetgroup.Group, error) {
url := RandomAppsURL(d.servers) url := randomAppsURL(d.servers)
apps, err := d.appsClient(ctx, d.client, url) apps, err := d.appsClient(ctx, d.client, url)
if err != nil { if err != nil {
return nil, err return nil, err
} }
groups := AppsToTargetGroups(apps) groups := appsToTargetGroups(apps)
return groups, nil return groups, nil
} }
// Task describes one instance of a service running on Marathon. // task describes one instance of a service running on Marathon.
type Task struct { type task struct {
ID string `json:"id"` ID string `json:"id"`
Host string `json:"host"` Host string `json:"host"`
Ports []uint32 `json:"ports"` Ports []uint32 `json:"ports"`
IPAddresses []IPAddress `json:"ipAddresses"` IPAddresses []ipAddress `json:"ipAddresses"`
} }
// IPAddress describes the address and protocol the container's network interface is bound to. // ipAddress describes the address and protocol the container's network interface is bound to.
type IPAddress struct { type ipAddress struct {
Address string `json:"ipAddress"` Address string `json:"ipAddress"`
Proto string `json:"protocol"` Proto string `json:"protocol"`
} }
// PortMapping describes in which port the process are binding inside the docker container. // PortMapping describes in which port the process are binding inside the docker container.
type PortMapping struct { type portMapping struct {
Labels map[string]string `json:"labels"` Labels map[string]string `json:"labels"`
ContainerPort uint32 `json:"containerPort"` ContainerPort uint32 `json:"containerPort"`
HostPort uint32 `json:"hostPort"` HostPort uint32 `json:"hostPort"`
@ -295,56 +245,56 @@ type PortMapping struct {
} }
// DockerContainer describes a container which uses the docker runtime. // DockerContainer describes a container which uses the docker runtime.
type DockerContainer struct { type dockerContainer struct {
Image string `json:"image"` Image string `json:"image"`
PortMappings []PortMapping `json:"portMappings"` PortMappings []portMapping `json:"portMappings"`
} }
// Container describes the runtime an app in running in. // Container describes the runtime an app in running in.
type Container struct { type container struct {
Docker DockerContainer `json:"docker"` Docker dockerContainer `json:"docker"`
PortMappings []PortMapping `json:"portMappings"` PortMappings []portMapping `json:"portMappings"`
} }
// PortDefinition describes which load balancer port you should access to access the service. // PortDefinition describes which load balancer port you should access to access the service.
type PortDefinition struct { type portDefinition struct {
Labels map[string]string `json:"labels"` Labels map[string]string `json:"labels"`
Port uint32 `json:"port"` Port uint32 `json:"port"`
} }
// Network describes the name and type of network the container is attached to. // Network describes the name and type of network the container is attached to.
type Network struct { type network struct {
Name string `json:"name"` Name string `json:"name"`
Mode string `json:"mode"` Mode string `json:"mode"`
} }
// App describes a service running on Marathon. // App describes a service running on Marathon.
type App struct { type app struct {
ID string `json:"id"` ID string `json:"id"`
Tasks []Task `json:"tasks"` Tasks []task `json:"tasks"`
RunningTasks int `json:"tasksRunning"` RunningTasks int `json:"tasksRunning"`
Labels map[string]string `json:"labels"` Labels map[string]string `json:"labels"`
Container Container `json:"container"` Container container `json:"container"`
PortDefinitions []PortDefinition `json:"portDefinitions"` PortDefinitions []portDefinition `json:"portDefinitions"`
Networks []Network `json:"networks"` Networks []network `json:"networks"`
RequirePorts bool `json:"requirePorts"` RequirePorts bool `json:"requirePorts"`
} }
// isContainerNet checks if the app's first network is set to mode 'container'. // isContainerNet checks if the app's first network is set to mode 'container'.
func (app App) isContainerNet() bool { func (app app) isContainerNet() bool {
return len(app.Networks) > 0 && app.Networks[0].Mode == "container" return len(app.Networks) > 0 && app.Networks[0].Mode == "container"
} }
// AppList is a list of Marathon apps. // appList is a list of Marathon apps.
type AppList struct { type appList struct {
Apps []App `json:"apps"` Apps []app `json:"apps"`
} }
// AppListClient defines a function that can be used to get an application list from marathon. // appListClient defines a function that can be used to get an application list from marathon.
type AppListClient func(ctx context.Context, client *http.Client, url string) (*AppList, error) type appListClient func(ctx context.Context, client *http.Client, url string) (*appList, error)
// fetchApps requests a list of applications from a marathon server. // fetchApps requests a list of applications from a marathon server.
func fetchApps(ctx context.Context, client *http.Client, url string) (*AppList, error) { func fetchApps(ctx context.Context, client *http.Client, url string) (*appList, error) {
request, err := http.NewRequest("GET", url, nil) request, err := http.NewRequest("GET", url, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -361,7 +311,7 @@ func fetchApps(ctx context.Context, client *http.Client, url string) (*AppList,
return nil, fmt.Errorf("non 2xx status '%v' response during marathon service discovery", resp.StatusCode) return nil, fmt.Errorf("non 2xx status '%v' response during marathon service discovery", resp.StatusCode)
} }
var apps AppList var apps appList
err = json.NewDecoder(resp.Body).Decode(&apps) err = json.NewDecoder(resp.Body).Decode(&apps)
if err != nil { if err != nil {
return nil, fmt.Errorf("%q: %v", url, err) return nil, fmt.Errorf("%q: %v", url, err)
@ -369,16 +319,16 @@ func fetchApps(ctx context.Context, client *http.Client, url string) (*AppList,
return &apps, nil return &apps, nil
} }
// RandomAppsURL randomly selects a server from an array and creates // randomAppsURL randomly selects a server from an array and creates
// an URL pointing to the app list. // an URL pointing to the app list.
func RandomAppsURL(servers []string) string { func randomAppsURL(servers []string) string {
// TODO: If possible update server list from Marathon at some point. // TODO: If possible update server list from Marathon at some point.
server := servers[rand.Intn(len(servers))] server := servers[rand.Intn(len(servers))]
return fmt.Sprintf("%s%s", server, appListPath) return fmt.Sprintf("%s%s", server, appListPath)
} }
// AppsToTargetGroups takes an array of Marathon apps and converts them into target groups. // appsToTargetGroups takes an array of Marathon apps and converts them into target groups.
func AppsToTargetGroups(apps *AppList) map[string]*targetgroup.Group { func appsToTargetGroups(apps *appList) map[string]*targetgroup.Group {
tgroups := map[string]*targetgroup.Group{} tgroups := map[string]*targetgroup.Group{}
for _, a := range apps.Apps { for _, a := range apps.Apps {
group := createTargetGroup(&a) group := createTargetGroup(&a)
@ -387,7 +337,7 @@ func AppsToTargetGroups(apps *AppList) map[string]*targetgroup.Group {
return tgroups return tgroups
} }
func createTargetGroup(app *App) *targetgroup.Group { func createTargetGroup(app *app) *targetgroup.Group {
var ( var (
targets = targetsForApp(app) targets = targetsForApp(app)
appName = model.LabelValue(app.ID) appName = model.LabelValue(app.ID)
@ -410,7 +360,7 @@ func createTargetGroup(app *App) *targetgroup.Group {
return tg return tg
} }
func targetsForApp(app *App) []model.LabelSet { func targetsForApp(app *app) []model.LabelSet {
targets := make([]model.LabelSet, 0, len(app.Tasks)) targets := make([]model.LabelSet, 0, len(app.Tasks))
var ports []uint32 var ports []uint32
@ -494,7 +444,7 @@ func targetsForApp(app *App) []model.LabelSet {
} }
// Generate a target endpoint string in host:port format. // Generate a target endpoint string in host:port format.
func targetEndpoint(task *Task, port uint32, containerNet bool) string { func targetEndpoint(task *task, port uint32, containerNet bool) string {
var host string var host string
@ -509,7 +459,7 @@ func targetEndpoint(task *Task, port uint32, containerNet bool) string {
} }
// Get a list of ports and a list of labels from a PortMapping. // Get a list of ports and a list of labels from a PortMapping.
func extractPortMapping(portMappings []PortMapping, containerNet bool) ([]uint32, []map[string]string) { func extractPortMapping(portMappings []portMapping, containerNet bool) ([]uint32, []map[string]string) {
ports := make([]uint32, len(portMappings)) ports := make([]uint32, len(portMappings))
labels := make([]map[string]string, len(portMappings)) labels := make([]map[string]string, len(portMappings))

View file

@ -20,7 +20,6 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
@ -32,86 +31,86 @@ var (
conf = SDConfig{Servers: testServers} conf = SDConfig{Servers: testServers}
) )
func testUpdateServices(client AppListClient, ch chan []*targetgroup.Group) error { func testUpdateServices(client appListClient) ([]*targetgroup.Group, error) {
md, err := NewDiscovery(conf, nil) md, err := NewDiscovery(conf, nil)
if err != nil { if err != nil {
return err return nil, err
} }
if client != nil {
md.appsClient = client md.appsClient = client
return md.updateServices(context.Background(), ch) }
return md.refresh(context.Background())
} }
func TestMarathonSDHandleError(t *testing.T) { func TestMarathonSDHandleError(t *testing.T) {
var ( var (
errTesting = errors.New("testing failure") errTesting = errors.New("testing failure")
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { return nil, errTesting } return nil, errTesting
}
) )
if err := testUpdateServices(client, ch); err != errTesting { tgs, err := testUpdateServices(client)
if err != errTesting {
t.Fatalf("Expected error: %s", err) t.Fatalf("Expected error: %s", err)
} }
select { if len(tgs) != 0 {
case tg := <-ch: t.Fatalf("Got group: %s", tgs)
t.Fatalf("Got group: %s", tg)
default:
} }
} }
func TestMarathonSDEmptyList(t *testing.T) { func TestMarathonSDEmptyList(t *testing.T) {
var ( var (
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) { return &appList{}, nil }
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { return &AppList{}, nil }
) )
if err := testUpdateServices(client, ch); err != nil { tgs, err := testUpdateServices(client)
if err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
} }
select { if len(tgs) > 0 {
case tg := <-ch: t.Fatalf("Got group: %v", tgs)
if len(tg) > 0 {
t.Fatalf("Got group: %v", tg)
}
default:
} }
} }
func marathonTestAppList(labels map[string]string, runningTasks int) *AppList { func marathonTestAppList(labels map[string]string, runningTasks int) *appList {
var ( var (
task = Task{ t = task{
ID: "test-task-1", ID: "test-task-1",
Host: "mesos-slave1", Host: "mesos-slave1",
} }
docker = DockerContainer{ docker = dockerContainer{
Image: "repo/image:tag", Image: "repo/image:tag",
} }
portMappings = []PortMapping{ portMappings = []portMapping{
{Labels: labels, HostPort: 31000}, {Labels: labels, HostPort: 31000},
} }
container = Container{Docker: docker, PortMappings: portMappings} container = container{Docker: docker, PortMappings: portMappings}
app = App{ a = app{
ID: "test-service", ID: "test-service",
Tasks: []Task{task}, Tasks: []task{t},
RunningTasks: runningTasks, RunningTasks: runningTasks,
Labels: labels, Labels: labels,
Container: container, Container: container,
} }
) )
return &AppList{ return &appList{
Apps: []App{app}, Apps: []app{a},
} }
} }
func TestMarathonSDSendGroup(t *testing.T) { func TestMarathonSDSendGroup(t *testing.T) {
var ( var (
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil return marathonTestAppList(marathonValidLabel, 1), nil
} }
) )
if err := testUpdateServices(client, ch); err != nil { tgs, err := testUpdateServices(client)
if err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
} }
select { if len(tgs) != 1 {
case tgs := <-ch: t.Fatal("Expected 1 target group, got", len(tgs))
}
tg := tgs[0] tg := tgs[0]
if tg.Source != "test-service" { if tg.Source != "test-service" {
@ -127,116 +126,86 @@ func TestMarathonSDSendGroup(t *testing.T) {
if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" {
t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel])
} }
default:
t.Fatal("Did not get a target group.")
}
} }
func TestMarathonSDRemoveApp(t *testing.T) { func TestMarathonSDRemoveApp(t *testing.T) {
var ch = make(chan []*targetgroup.Group, 1)
md, err := NewDiscovery(conf, nil) md, err := NewDiscovery(conf, nil)
if err != nil { if err != nil {
t.Fatalf("%s", err) t.Fatalf("%s", err)
} }
md.appsClient = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { md.appsClient = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil return marathonTestAppList(marathonValidLabel, 1), nil
} }
if err := md.updateServices(context.Background(), ch); err != nil { tgs, err := md.refresh(context.Background())
if err != nil {
t.Fatalf("Got error on first update: %s", err) t.Fatalf("Got error on first update: %s", err)
} }
up1 := (<-ch)[0] if len(tgs) != 1 {
t.Fatal("Expected 1 targetgroup, got", len(tgs))
}
tg1 := tgs[0]
md.appsClient = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) { md.appsClient = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
return marathonTestAppList(marathonValidLabel, 0), nil return marathonTestAppList(marathonValidLabel, 0), nil
} }
if err := md.updateServices(context.Background(), ch); err != nil { tgs, err = md.refresh(context.Background())
if err != nil {
t.Fatalf("Got error on second update: %s", err) t.Fatalf("Got error on second update: %s", err)
} }
up2 := (<-ch)[0] if len(tgs) != 1 {
t.Fatal("Expected 1 targetgroup, got", len(tgs))
}
tg2 := tgs[0]
if up2.Source != up1.Source { if tg2.Source != tg1.Source {
t.Fatalf("Source is different: %s", up2) t.Fatalf("Source is different: %s != %s", tg1.Source, tg2.Source)
if len(up2.Targets) > 0 { if len(tg2.Targets) > 0 {
t.Fatalf("Got a non-empty target set: %s", up2.Targets) t.Fatalf("Got a non-empty target set: %s", tg2.Targets)
} }
} }
} }
func TestMarathonSDRunAndStop(t *testing.T) { func marathonTestAppListWithMultiplePorts(labels map[string]string, runningTasks int) *appList {
var ( var (
refreshInterval = model.Duration(time.Millisecond * 10) t = task{
conf = SDConfig{Servers: testServers, RefreshInterval: refreshInterval}
ch = make(chan []*targetgroup.Group)
doneCh = make(chan error)
)
md, err := NewDiscovery(conf, nil)
if err != nil {
t.Fatalf("%s", err)
}
md.appsClient = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
md.Run(ctx, ch)
close(doneCh)
}()
timeout := time.After(md.refreshInterval * 3)
for {
select {
case <-ch:
cancel()
case <-doneCh:
cancel()
return
case <-timeout:
t.Fatalf("Update took too long.")
}
}
}
func marathonTestAppListWithMultiplePorts(labels map[string]string, runningTasks int) *AppList {
var (
task = Task{
ID: "test-task-1", ID: "test-task-1",
Host: "mesos-slave1", Host: "mesos-slave1",
} }
docker = DockerContainer{ docker = dockerContainer{
Image: "repo/image:tag", Image: "repo/image:tag",
} }
portMappings = []PortMapping{ portMappings = []portMapping{
{Labels: labels, HostPort: 31000}, {Labels: labels, HostPort: 31000},
{Labels: make(map[string]string), HostPort: 32000}, {Labels: make(map[string]string), HostPort: 32000},
} }
container = Container{Docker: docker, PortMappings: portMappings} container = container{Docker: docker, PortMappings: portMappings}
app = App{ a = app{
ID: "test-service", ID: "test-service",
Tasks: []Task{task}, Tasks: []task{t},
RunningTasks: runningTasks, RunningTasks: runningTasks,
Labels: labels, Labels: labels,
Container: container, Container: container,
} }
) )
return &AppList{ return &appList{
Apps: []App{app}, Apps: []app{a},
} }
} }
func TestMarathonSDSendGroupWithMultiplePort(t *testing.T) { func TestMarathonSDSendGroupWithMultiplePort(t *testing.T) {
var ( var (
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) {
return marathonTestAppListWithMultiplePorts(marathonValidLabel, 1), nil return marathonTestAppListWithMultiplePorts(marathonValidLabel, 1), nil
} }
) )
if err := testUpdateServices(client, ch); err != nil { tgs, err := testUpdateServices(client)
if err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
} }
select { if len(tgs) != 1 {
case tgs := <-ch: t.Fatal("Expected 1 target group, got", len(tgs))
}
tg := tgs[0] tg := tgs[0]
if tg.Source != "test-service" { if tg.Source != "test-service" {
@ -259,45 +228,43 @@ func TestMarathonSDSendGroupWithMultiplePort(t *testing.T) {
if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" {
t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel])
} }
default:
t.Fatal("Did not get a target group.")
}
} }
func marathonTestZeroTaskPortAppList(labels map[string]string, runningTasks int) *AppList { func marathonTestZeroTaskPortAppList(labels map[string]string, runningTasks int) *appList {
var ( var (
task = Task{ t = task{
ID: "test-task-2", ID: "test-task-2",
Host: "mesos-slave-2", Host: "mesos-slave-2",
Ports: []uint32{}, Ports: []uint32{},
} }
docker = DockerContainer{Image: "repo/image:tag"} docker = dockerContainer{Image: "repo/image:tag"}
container = Container{Docker: docker} container = container{Docker: docker}
app = App{ a = app{
ID: "test-service-zero-ports", ID: "test-service-zero-ports",
Tasks: []Task{task}, Tasks: []task{t},
RunningTasks: runningTasks, RunningTasks: runningTasks,
Labels: labels, Labels: labels,
Container: container, Container: container,
} }
) )
return &AppList{ return &appList{
Apps: []App{app}, Apps: []app{a},
} }
} }
func TestMarathonZeroTaskPorts(t *testing.T) { func TestMarathonZeroTaskPorts(t *testing.T) {
var ( var (
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) {
return marathonTestZeroTaskPortAppList(marathonValidLabel, 1), nil return marathonTestZeroTaskPortAppList(marathonValidLabel, 1), nil
} }
) )
if err := testUpdateServices(client, ch); err != nil { tgs, err := testUpdateServices(client)
if err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
} }
select { if len(tgs) != 1 {
case tgs := <-ch: t.Fatal("Expected 1 target group, got", len(tgs))
}
tg := tgs[0] tg := tgs[0]
if tg.Source != "test-service-zero-ports" { if tg.Source != "test-service-zero-ports" {
@ -306,16 +273,9 @@ func TestMarathonZeroTaskPorts(t *testing.T) {
if len(tg.Targets) != 0 { if len(tg.Targets) != 0 {
t.Fatalf("Wrong number of targets: %v", tg.Targets) t.Fatalf("Wrong number of targets: %v", tg.Targets)
} }
default:
t.Fatal("Did not get a target group.")
}
} }
func Test500ErrorHttpResponseWithValidJSONBody(t *testing.T) { func Test500ErrorHttpResponseWithValidJSONBody(t *testing.T) {
var (
ch = make(chan []*targetgroup.Group, 1)
client = fetchApps
)
// Simulate 500 error with a valid JSON response. // Simulate 500 error with a valid JSON response.
respHandler := func(w http.ResponseWriter, r *http.Request) { respHandler := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
@ -333,53 +293,55 @@ func Test500ErrorHttpResponseWithValidJSONBody(t *testing.T) {
// Setup conf for the test case. // Setup conf for the test case.
conf = SDConfig{Servers: []string{ts.URL}} conf = SDConfig{Servers: []string{ts.URL}}
// Execute test case and validate behavior. // Execute test case and validate behavior.
if err := testUpdateServices(client, ch); err == nil { _, err := testUpdateServices(nil)
t.Fatalf("Expected error for 5xx HTTP response from marathon server") if err == nil {
t.Fatalf("Expected error for 5xx HTTP response from marathon server, got nil")
} }
} }
func marathonTestAppListWithPortDefinitions(labels map[string]string, runningTasks int) *AppList { func marathonTestAppListWithPortDefinitions(labels map[string]string, runningTasks int) *appList {
var ( var (
task = Task{ t = task{
ID: "test-task-1", ID: "test-task-1",
Host: "mesos-slave1", Host: "mesos-slave1",
// Auto-generated ports when requirePorts is false // Auto-generated ports when requirePorts is false
Ports: []uint32{1234, 5678}, Ports: []uint32{1234, 5678},
} }
docker = DockerContainer{ docker = dockerContainer{
Image: "repo/image:tag", Image: "repo/image:tag",
} }
container = Container{Docker: docker} container = container{Docker: docker}
app = App{ a = app{
ID: "test-service", ID: "test-service",
Tasks: []Task{task}, Tasks: []task{t},
RunningTasks: runningTasks, RunningTasks: runningTasks,
Labels: labels, Labels: labels,
Container: container, Container: container,
PortDefinitions: []PortDefinition{ PortDefinitions: []portDefinition{
{Labels: make(map[string]string), Port: 31000}, {Labels: make(map[string]string), Port: 31000},
{Labels: labels, Port: 32000}, {Labels: labels, Port: 32000},
}, },
RequirePorts: false, // default RequirePorts: false, // default
} }
) )
return &AppList{ return &appList{
Apps: []App{app}, Apps: []app{a},
} }
} }
func TestMarathonSDSendGroupWithPortDefinitions(t *testing.T) { func TestMarathonSDSendGroupWithPortDefinitions(t *testing.T) {
var ( var (
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) {
return marathonTestAppListWithPortDefinitions(marathonValidLabel, 1), nil return marathonTestAppListWithPortDefinitions(marathonValidLabel, 1), nil
} }
) )
if err := testUpdateServices(client, ch); err != nil { tgs, err := testUpdateServices(client)
if err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
} }
select { if len(tgs) != 1 {
case tgs := <-ch: t.Fatal("Expected 1 target group, got", len(tgs))
}
tg := tgs[0] tg := tgs[0]
if tg.Source != "test-service" { if tg.Source != "test-service" {
@ -408,52 +370,50 @@ func TestMarathonSDSendGroupWithPortDefinitions(t *testing.T) {
if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "yes" { if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "yes" {
t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel])
} }
default:
t.Fatal("Did not get a target group.")
}
} }
func marathonTestAppListWithPortDefinitionsRequirePorts(labels map[string]string, runningTasks int) *AppList { func marathonTestAppListWithPortDefinitionsRequirePorts(labels map[string]string, runningTasks int) *appList {
var ( var (
task = Task{ t = task{
ID: "test-task-1", ID: "test-task-1",
Host: "mesos-slave1", Host: "mesos-slave1",
Ports: []uint32{31000, 32000}, Ports: []uint32{31000, 32000},
} }
docker = DockerContainer{ docker = dockerContainer{
Image: "repo/image:tag", Image: "repo/image:tag",
} }
container = Container{Docker: docker} container = container{Docker: docker}
app = App{ a = app{
ID: "test-service", ID: "test-service",
Tasks: []Task{task}, Tasks: []task{t},
RunningTasks: runningTasks, RunningTasks: runningTasks,
Labels: labels, Labels: labels,
Container: container, Container: container,
PortDefinitions: []PortDefinition{ PortDefinitions: []portDefinition{
{Labels: make(map[string]string), Port: 31000}, {Labels: make(map[string]string), Port: 31000},
{Labels: labels, Port: 32000}, {Labels: labels, Port: 32000},
}, },
RequirePorts: true, RequirePorts: true,
} }
) )
return &AppList{ return &appList{
Apps: []App{app}, Apps: []app{a},
} }
} }
func TestMarathonSDSendGroupWithPortDefinitionsRequirePorts(t *testing.T) { func TestMarathonSDSendGroupWithPortDefinitionsRequirePorts(t *testing.T) {
var ( var (
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) {
return marathonTestAppListWithPortDefinitionsRequirePorts(marathonValidLabel, 1), nil return marathonTestAppListWithPortDefinitionsRequirePorts(marathonValidLabel, 1), nil
} }
) )
if err := testUpdateServices(client, ch); err != nil { tgs, err := testUpdateServices(client)
if err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
} }
select { if len(tgs) != 1 {
case tgs := <-ch: t.Fatal("Expected 1 target group, got", len(tgs))
}
tg := tgs[0] tg := tgs[0]
if tg.Source != "test-service" { if tg.Source != "test-service" {
@ -482,47 +442,45 @@ func TestMarathonSDSendGroupWithPortDefinitionsRequirePorts(t *testing.T) {
if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "yes" { if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "yes" {
t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel])
} }
default:
t.Fatal("Did not get a target group.")
}
} }
func marathonTestAppListWithPorts(labels map[string]string, runningTasks int) *AppList { func marathonTestAppListWithPorts(labels map[string]string, runningTasks int) *appList {
var ( var (
task = Task{ t = task{
ID: "test-task-1", ID: "test-task-1",
Host: "mesos-slave1", Host: "mesos-slave1",
Ports: []uint32{31000, 32000}, Ports: []uint32{31000, 32000},
} }
docker = DockerContainer{ docker = dockerContainer{
Image: "repo/image:tag", Image: "repo/image:tag",
} }
container = Container{Docker: docker} container = container{Docker: docker}
app = App{ a = app{
ID: "test-service", ID: "test-service",
Tasks: []Task{task}, Tasks: []task{t},
RunningTasks: runningTasks, RunningTasks: runningTasks,
Labels: labels, Labels: labels,
Container: container, Container: container,
} }
) )
return &AppList{ return &appList{
Apps: []App{app}, Apps: []app{a},
} }
} }
func TestMarathonSDSendGroupWithPorts(t *testing.T) { func TestMarathonSDSendGroupWithPorts(t *testing.T) {
var ( var (
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) {
return marathonTestAppListWithPorts(marathonValidLabel, 1), nil return marathonTestAppListWithPorts(marathonValidLabel, 1), nil
} }
) )
if err := testUpdateServices(client, ch); err != nil { tgs, err := testUpdateServices(client)
if err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
} }
select { if len(tgs) != 1 {
case tgs := <-ch: t.Fatal("Expected 1 target group, got", len(tgs))
}
tg := tgs[0] tg := tgs[0]
if tg.Source != "test-service" { if tg.Source != "test-service" {
@ -551,14 +509,11 @@ func TestMarathonSDSendGroupWithPorts(t *testing.T) {
if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" {
t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel])
} }
default:
t.Fatal("Did not get a target group.")
}
} }
func marathonTestAppListWithContainerPortMappings(labels map[string]string, runningTasks int) *AppList { func marathonTestAppListWithContainerPortMappings(labels map[string]string, runningTasks int) *appList {
var ( var (
task = Task{ t = task{
ID: "test-task-1", ID: "test-task-1",
Host: "mesos-slave1", Host: "mesos-slave1",
Ports: []uint32{ Ports: []uint32{
@ -566,41 +521,42 @@ func marathonTestAppListWithContainerPortMappings(labels map[string]string, runn
32000, 32000,
}, },
} }
docker = DockerContainer{ docker = dockerContainer{
Image: "repo/image:tag", Image: "repo/image:tag",
} }
container = Container{ container = container{
Docker: docker, Docker: docker,
PortMappings: []PortMapping{ PortMappings: []portMapping{
{Labels: labels, HostPort: 0}, {Labels: labels, HostPort: 0},
{Labels: make(map[string]string), HostPort: 32000}, {Labels: make(map[string]string), HostPort: 32000},
}, },
} }
app = App{ a = app{
ID: "test-service", ID: "test-service",
Tasks: []Task{task}, Tasks: []task{t},
RunningTasks: runningTasks, RunningTasks: runningTasks,
Labels: labels, Labels: labels,
Container: container, Container: container,
} }
) )
return &AppList{ return &appList{
Apps: []App{app}, Apps: []app{a},
} }
} }
func TestMarathonSDSendGroupWithContainerPortMappings(t *testing.T) { func TestMarathonSDSendGroupWithContainerPortMappings(t *testing.T) {
var ( var (
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) {
return marathonTestAppListWithContainerPortMappings(marathonValidLabel, 1), nil return marathonTestAppListWithContainerPortMappings(marathonValidLabel, 1), nil
} }
) )
if err := testUpdateServices(client, ch); err != nil { tgs, err := testUpdateServices(client)
if err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
} }
select { if len(tgs) != 1 {
case tgs := <-ch: t.Fatal("Expected 1 target group, got", len(tgs))
}
tg := tgs[0] tg := tgs[0]
if tg.Source != "test-service" { if tg.Source != "test-service" {
@ -629,14 +585,11 @@ func TestMarathonSDSendGroupWithContainerPortMappings(t *testing.T) {
if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" {
t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel])
} }
default:
t.Fatal("Did not get a target group.")
}
} }
func marathonTestAppListWithDockerContainerPortMappings(labels map[string]string, runningTasks int) *AppList { func marathonTestAppListWithDockerContainerPortMappings(labels map[string]string, runningTasks int) *appList {
var ( var (
task = Task{ t = task{
ID: "test-task-1", ID: "test-task-1",
Host: "mesos-slave1", Host: "mesos-slave1",
Ports: []uint32{ Ports: []uint32{
@ -644,41 +597,42 @@ func marathonTestAppListWithDockerContainerPortMappings(labels map[string]string
12345, // 'Automatically-generated' port 12345, // 'Automatically-generated' port
}, },
} }
docker = DockerContainer{ docker = dockerContainer{
Image: "repo/image:tag", Image: "repo/image:tag",
PortMappings: []PortMapping{ PortMappings: []portMapping{
{Labels: labels, HostPort: 31000}, {Labels: labels, HostPort: 31000},
{Labels: make(map[string]string), HostPort: 0}, {Labels: make(map[string]string), HostPort: 0},
}, },
} }
container = Container{ container = container{
Docker: docker, Docker: docker,
} }
app = App{ a = app{
ID: "test-service", ID: "test-service",
Tasks: []Task{task}, Tasks: []task{t},
RunningTasks: runningTasks, RunningTasks: runningTasks,
Labels: labels, Labels: labels,
Container: container, Container: container,
} }
) )
return &AppList{ return &appList{
Apps: []App{app}, Apps: []app{a},
} }
} }
func TestMarathonSDSendGroupWithDockerContainerPortMappings(t *testing.T) { func TestMarathonSDSendGroupWithDockerContainerPortMappings(t *testing.T) {
var ( var (
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) {
return marathonTestAppListWithDockerContainerPortMappings(marathonValidLabel, 1), nil return marathonTestAppListWithDockerContainerPortMappings(marathonValidLabel, 1), nil
} }
) )
if err := testUpdateServices(client, ch); err != nil { tgs, err := testUpdateServices(client)
if err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
} }
select { if len(tgs) != 1 {
case tgs := <-ch: t.Fatal("Expected 1 target group, got", len(tgs))
}
tg := tgs[0] tg := tgs[0]
if tg.Source != "test-service" { if tg.Source != "test-service" {
@ -707,60 +661,58 @@ func TestMarathonSDSendGroupWithDockerContainerPortMappings(t *testing.T) {
if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" {
t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel])
} }
default:
t.Fatal("Did not get a target group.")
}
} }
func marathonTestAppListWithContainerNetworkAndPortMappings(labels map[string]string, runningTasks int) *AppList { func marathonTestAppListWithContainerNetworkAndPortMappings(labels map[string]string, runningTasks int) *appList {
var ( var (
task = Task{ t = task{
ID: "test-task-1", ID: "test-task-1",
Host: "mesos-slave1", Host: "mesos-slave1",
IPAddresses: []IPAddress{ IPAddresses: []ipAddress{
{Address: "1.2.3.4"}, {Address: "1.2.3.4"},
}, },
} }
docker = DockerContainer{ docker = dockerContainer{
Image: "repo/image:tag", Image: "repo/image:tag",
} }
portMappings = []PortMapping{ portMappings = []portMapping{
{Labels: labels, ContainerPort: 8080, HostPort: 31000}, {Labels: labels, ContainerPort: 8080, HostPort: 31000},
{Labels: make(map[string]string), ContainerPort: 1234, HostPort: 32000}, {Labels: make(map[string]string), ContainerPort: 1234, HostPort: 32000},
} }
container = Container{ container = container{
Docker: docker, Docker: docker,
PortMappings: portMappings, PortMappings: portMappings,
} }
networks = []Network{ networks = []network{
{Mode: "container", Name: "test-network"}, {Mode: "container", Name: "test-network"},
} }
app = App{ a = app{
ID: "test-service", ID: "test-service",
Tasks: []Task{task}, Tasks: []task{t},
RunningTasks: runningTasks, RunningTasks: runningTasks,
Labels: labels, Labels: labels,
Container: container, Container: container,
Networks: networks, Networks: networks,
} }
) )
return &AppList{ return &appList{
Apps: []App{app}, Apps: []app{a},
} }
} }
func TestMarathonSDSendGroupWithContainerNetworkAndPortMapping(t *testing.T) { func TestMarathonSDSendGroupWithContainerNetworkAndPortMapping(t *testing.T) {
var ( var (
ch = make(chan []*targetgroup.Group, 1) client = func(_ context.Context, _ *http.Client, _ string) (*appList, error) {
client = func(_ context.Context, _ *http.Client, _ string) (*AppList, error) {
return marathonTestAppListWithContainerNetworkAndPortMappings(marathonValidLabel, 1), nil return marathonTestAppListWithContainerNetworkAndPortMappings(marathonValidLabel, 1), nil
} }
) )
if err := testUpdateServices(client, ch); err != nil { tgs, err := testUpdateServices(client)
if err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
} }
select { if len(tgs) != 1 {
case tgs := <-ch: t.Fatal("Expected 1 target group, got", len(tgs))
}
tg := tgs[0] tg := tgs[0]
if tg.Source != "test-service" { if tg.Source != "test-service" {
@ -789,7 +741,4 @@ func TestMarathonSDSendGroupWithContainerNetworkAndPortMapping(t *testing.T) {
if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" {
t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel])
} }
default:
t.Fatal("Did not get a target group.")
}
} }

View file

@ -17,7 +17,6 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/gophercloud/gophercloud" "github.com/gophercloud/gophercloud"
@ -45,25 +44,16 @@ type HypervisorDiscovery struct {
port int port int
} }
// NewHypervisorDiscovery returns a new hypervisor discovery. // newHypervisorDiscovery returns a new hypervisor discovery.
func NewHypervisorDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions, func newHypervisorDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions,
port int, region string, l log.Logger) *HypervisorDiscovery { port int, region string, l log.Logger) *HypervisorDiscovery {
return &HypervisorDiscovery{provider: provider, authOpts: opts, return &HypervisorDiscovery{provider: provider, authOpts: opts,
region: region, port: port, logger: l} region: region, port: port, logger: l}
} }
func (h *HypervisorDiscovery) refresh(ctx context.Context) (*targetgroup.Group, error) { func (h *HypervisorDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
var err error
t0 := time.Now()
defer func() {
refreshDuration.Observe(time.Since(t0).Seconds())
if err != nil {
refreshFailuresCount.Inc()
}
}()
h.provider.Context = ctx h.provider.Context = ctx
err = openstack.Authenticate(h.provider, *h.authOpts) err := openstack.Authenticate(h.provider, *h.authOpts)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not authenticate to OpenStack: %s", err) return nil, fmt.Errorf("could not authenticate to OpenStack: %s", err)
} }
@ -102,5 +92,5 @@ func (h *HypervisorDiscovery) refresh(ctx context.Context) (*targetgroup.Group,
return nil, err return nil, err
} }
return tg, nil return []*targetgroup.Group{tg}, nil
} }

View file

@ -40,7 +40,7 @@ func (s *OpenstackSDHypervisorTestSuite) SetupTest(t *testing.T) {
s.Mock.HandleAuthSuccessfully() s.Mock.HandleAuthSuccessfully()
} }
func (s *OpenstackSDHypervisorTestSuite) openstackAuthSuccess() (*Discovery, error) { func (s *OpenstackSDHypervisorTestSuite) openstackAuthSuccess() (refresher, error) {
conf := SDConfig{ conf := SDConfig{
IdentityEndpoint: s.Mock.Endpoint(), IdentityEndpoint: s.Mock.Endpoint(),
Password: "test", Password: "test",
@ -49,7 +49,7 @@ func (s *OpenstackSDHypervisorTestSuite) openstackAuthSuccess() (*Discovery, err
Region: "RegionOne", Region: "RegionOne",
Role: "hypervisor", Role: "hypervisor",
} }
return NewDiscovery(&conf, nil) return newRefresher(&conf, nil)
} }
func TestOpenstackSDHypervisorRefresh(t *testing.T) { func TestOpenstackSDHypervisorRefresh(t *testing.T) {
@ -59,7 +59,9 @@ func TestOpenstackSDHypervisorRefresh(t *testing.T) {
hypervisor, _ := mock.openstackAuthSuccess() hypervisor, _ := mock.openstackAuthSuccess()
ctx := context.Background() ctx := context.Background()
tg, err := hypervisor.r.refresh(ctx) tgs, err := hypervisor.refresh(ctx)
testutil.Equals(t, 1, len(tgs))
tg := tgs[0]
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Assert(t, tg != nil, "") testutil.Assert(t, tg != nil, "")
testutil.Assert(t, tg.Targets != nil, "") testutil.Assert(t, tg.Targets != nil, "")
@ -89,7 +91,7 @@ func TestOpenstackSDHypervisorRefreshWithDoneContext(t *testing.T) {
hypervisor, _ := mock.openstackAuthSuccess() hypervisor, _ := mock.openstackAuthSuccess()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
_, err := hypervisor.r.refresh(ctx) _, err := hypervisor.refresh(ctx)
testutil.NotOk(t, err, "") testutil.NotOk(t, err, "")
testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error()), "%q doesn't contain %q", err, context.Canceled) testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error()), "%q doesn't contain %q", err, context.Canceled)

View file

@ -17,7 +17,6 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
@ -27,6 +26,7 @@ import (
"github.com/gophercloud/gophercloud/openstack/compute/v2/servers" "github.com/gophercloud/gophercloud/openstack/compute/v2/servers"
"github.com/gophercloud/gophercloud/pagination" "github.com/gophercloud/gophercloud/pagination"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
) )
@ -54,7 +54,7 @@ type InstanceDiscovery struct {
} }
// NewInstanceDiscovery returns a new instance discovery. // NewInstanceDiscovery returns a new instance discovery.
func NewInstanceDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions, func newInstanceDiscovery(provider *gophercloud.ProviderClient, opts *gophercloud.AuthOptions,
port int, region string, allTenants bool, l log.Logger) *InstanceDiscovery { port int, region string, allTenants bool, l log.Logger) *InstanceDiscovery {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
@ -68,18 +68,9 @@ type floatingIPKey struct {
fixed string fixed string
} }
func (i *InstanceDiscovery) refresh(ctx context.Context) (*targetgroup.Group, error) { func (i *InstanceDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
var err error
t0 := time.Now()
defer func() {
refreshDuration.Observe(time.Since(t0).Seconds())
if err != nil {
refreshFailuresCount.Inc()
}
}()
i.provider.Context = ctx i.provider.Context = ctx
err = openstack.Authenticate(i.provider, *i.authOpts) err := openstack.Authenticate(i.provider, *i.authOpts)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not authenticate to OpenStack: %s", err) return nil, fmt.Errorf("could not authenticate to OpenStack: %s", err)
} }
@ -200,5 +191,5 @@ func (i *InstanceDiscovery) refresh(ctx context.Context) (*targetgroup.Group, er
return nil, err return nil, err
} }
return tg, nil return []*targetgroup.Group{tg}, nil
} }

View file

@ -42,7 +42,7 @@ func (s *OpenstackSDInstanceTestSuite) SetupTest(t *testing.T) {
s.Mock.HandleAuthSuccessfully() s.Mock.HandleAuthSuccessfully()
} }
func (s *OpenstackSDInstanceTestSuite) openstackAuthSuccess() (*Discovery, error) { func (s *OpenstackSDInstanceTestSuite) openstackAuthSuccess() (refresher, error) {
conf := SDConfig{ conf := SDConfig{
IdentityEndpoint: s.Mock.Endpoint(), IdentityEndpoint: s.Mock.Endpoint(),
Password: "test", Password: "test",
@ -52,7 +52,7 @@ func (s *OpenstackSDInstanceTestSuite) openstackAuthSuccess() (*Discovery, error
Role: "instance", Role: "instance",
AllTenants: true, AllTenants: true,
} }
return NewDiscovery(&conf, nil) return newRefresher(&conf, nil)
} }
func TestOpenstackSDInstanceRefresh(t *testing.T) { func TestOpenstackSDInstanceRefresh(t *testing.T) {
@ -64,9 +64,12 @@ func TestOpenstackSDInstanceRefresh(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
ctx := context.Background() ctx := context.Background()
tg, err := instance.r.refresh(ctx) tgs, err := instance.refresh(ctx)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 1, len(tgs))
tg := tgs[0]
testutil.Assert(t, tg != nil, "") testutil.Assert(t, tg != nil, "")
testutil.Assert(t, tg.Targets != nil, "") testutil.Assert(t, tg.Targets != nil, "")
testutil.Equals(t, 4, len(tg.Targets)) testutil.Equals(t, 4, len(tg.Targets))
@ -128,7 +131,7 @@ func TestOpenstackSDInstanceRefreshWithDoneContext(t *testing.T) {
hypervisor, _ := mock.openstackAuthSuccess() hypervisor, _ := mock.openstackAuthSuccess()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
_, err := hypervisor.r.refresh(ctx) _, err := hypervisor.refresh(ctx)
testutil.NotOk(t, err, "") testutil.NotOk(t, err, "")
testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error()), "%q doesn't contain %q", err, context.Canceled) testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error()), "%q doesn't contain %q", err, context.Canceled)

View file

@ -21,34 +21,21 @@ import (
"time" "time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gophercloud/gophercloud" "github.com/gophercloud/gophercloud"
"github.com/gophercloud/gophercloud/openstack" "github.com/gophercloud/gophercloud/openstack"
"github.com/mwitkow/go-conntrack" "github.com/mwitkow/go-conntrack"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
) )
var ( // DefaultSDConfig is the default OpenStack SD configuration.
refreshFailuresCount = prometheus.NewCounter( var DefaultSDConfig = SDConfig{
prometheus.CounterOpts{
Name: "prometheus_sd_openstack_refresh_failures_total",
Help: "The number of OpenStack-SD scrape failures.",
})
refreshDuration = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "prometheus_sd_openstack_refresh_duration_seconds",
Help: "The duration of an OpenStack-SD refresh in seconds.",
})
// DefaultSDConfig is the default OpenStack SD configuration.
DefaultSDConfig = SDConfig{
Port: 80, Port: 80,
RefreshInterval: model.Duration(60 * time.Second), RefreshInterval: model.Duration(60 * time.Second),
} }
)
// SDConfig is the configuration for OpenStack based service discovery. // SDConfig is the configuration for OpenStack based service discovery.
type SDConfig struct { type SDConfig struct {
@ -71,7 +58,7 @@ type SDConfig struct {
TLSConfig config_util.TLSConfig `yaml:"tls_config,omitempty"` TLSConfig config_util.TLSConfig `yaml:"tls_config,omitempty"`
} }
// OpenStackRole is role of the target in OpenStack. // Role is the role of the target in OpenStack.
type Role string type Role string
// The valid options for OpenStackRole. // The valid options for OpenStackRole.
@ -114,25 +101,26 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil return nil
} }
func init() {
prometheus.MustRegister(refreshFailuresCount)
prometheus.MustRegister(refreshDuration)
}
type refresher interface { type refresher interface {
refresh(ctx context.Context) (tg *targetgroup.Group, err error) refresh(context.Context) ([]*targetgroup.Group, error)
} }
// Discovery periodically performs OpenStack-SD requests. It implements // NewDiscovery returns a new OpenStack Discoverer which periodically refreshes its targets.
// the Discoverer interface. func NewDiscovery(conf *SDConfig, l log.Logger) (*refresh.Discovery, error) {
type Discovery struct { r, err := newRefresher(conf, l)
interval time.Duration if err != nil {
logger log.Logger return nil, err
r refresher }
return refresh.NewDiscovery(
l,
"openstack",
time.Duration(conf.RefreshInterval),
r.refresh,
), nil
} }
// NewDiscovery returns a new OpenStackDiscovery which periodically refreshes its targets. func newRefresher(conf *SDConfig, l log.Logger) (refresher, error) {
func NewDiscovery(conf *SDConfig, l log.Logger) (*Discovery, error) {
var opts gophercloud.AuthOptions var opts gophercloud.AuthOptions
if conf.IdentityEndpoint == "" { if conf.IdentityEndpoint == "" {
var err error var err error
@ -174,51 +162,11 @@ func NewDiscovery(conf *SDConfig, l log.Logger) (*Discovery, error) {
}, },
Timeout: 5 * time.Duration(conf.RefreshInterval), Timeout: 5 * time.Duration(conf.RefreshInterval),
} }
var r refresher
switch conf.Role { switch conf.Role {
case OpenStackRoleHypervisor: case OpenStackRoleHypervisor:
r = NewHypervisorDiscovery(client, &opts, conf.Port, conf.Region, l) return newHypervisorDiscovery(client, &opts, conf.Port, conf.Region, l), nil
case OpenStackRoleInstance: case OpenStackRoleInstance:
r = NewInstanceDiscovery(client, &opts, conf.Port, conf.Region, conf.AllTenants, l) return newInstanceDiscovery(client, &opts, conf.Port, conf.Region, conf.AllTenants, l), nil
default: }
return nil, errors.New("unknown OpenStack discovery role") return nil, errors.New("unknown OpenStack discovery role")
}
return &Discovery{r: r, logger: l, interval: time.Duration(conf.RefreshInterval)}, nil
}
// Run implements the Discoverer interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// Get an initial set right away.
tg, err := d.r.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error())
} else {
select {
case ch <- []*targetgroup.Group{tg}:
case <-ctx.Done():
return
}
}
ticker := time.NewTicker(d.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tg, err := d.r.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error())
continue
}
select {
case ch <- []*targetgroup.Group{tg}:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
} }

View file

@ -0,0 +1,117 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package refresh
import (
"context"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/discovery/targetgroup"
)
var (
failuresCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "prometheus_sd_refresh_failures_total",
Help: "Number of refresh failures for the given SD mechanism.",
},
[]string{"mechanism"},
)
duration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "prometheus_sd_refresh_duration_seconds",
Help: "The duration of a refresh in seconds for the given SD mechanism.",
},
[]string{"mechanism"},
)
)
func init() {
prometheus.MustRegister(duration, failuresCount)
}
// Discovery implements the Discoverer interface.
type Discovery struct {
logger log.Logger
interval time.Duration
refreshf func(ctx context.Context) ([]*targetgroup.Group, error)
failures prometheus.Counter
duration prometheus.Observer
}
// NewDiscovery returns a Discoverer function that calls a refresh() function at every interval.
func NewDiscovery(l log.Logger, mech string, interval time.Duration, refreshf func(ctx context.Context) ([]*targetgroup.Group, error)) *Discovery {
if l == nil {
l = log.NewNopLogger()
}
return &Discovery{
logger: l,
interval: interval,
refreshf: refreshf,
failures: failuresCount.WithLabelValues(mech),
duration: duration.WithLabelValues(mech),
}
}
// Run implements the Discoverer interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// Get an initial set right away.
tgs, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error())
} else {
select {
case ch <- tgs:
case <-ctx.Done():
return
}
}
ticker := time.NewTicker(d.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tgs, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error())
continue
}
select {
case ch <- tgs:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
now := time.Now()
defer d.duration.Observe(time.Since(now).Seconds())
tgs, err := d.refreshf(ctx)
if err != nil {
d.failures.Inc()
}
return tgs, err
}

View file

@ -0,0 +1,83 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package refresh
import (
"context"
"fmt"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/testutil"
)
func TestRefresh(t *testing.T) {
tg1 := []*targetgroup.Group{
{
Source: "tg",
Targets: []model.LabelSet{
{
model.LabelName("t1"): model.LabelValue("v1"),
},
{
model.LabelName("t2"): model.LabelValue("v2"),
},
},
Labels: model.LabelSet{
model.LabelName("l1"): model.LabelValue("lv1"),
},
},
}
tg2 := []*targetgroup.Group{
{
Source: "tg",
},
}
var i int
refresh := func(ctx context.Context) ([]*targetgroup.Group, error) {
i++
switch i {
case 1:
return tg1, nil
case 2:
return tg2, nil
}
return nil, fmt.Errorf("some error")
}
interval := time.Millisecond
d := NewDiscovery(nil, "test", interval, refresh)
ch := make(chan []*targetgroup.Group)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go d.Run(ctx, ch)
tg := <-ch
testutil.Equals(t, tg1, tg)
tg = <-ch
testutil.Equals(t, tg2, tg)
tick := time.NewTicker(2 * interval)
defer tick.Stop()
select {
case <-ch:
t.Fatal("Unexpected target group")
case <-tick.C:
}
}

View file

@ -24,12 +24,11 @@ import (
"time" "time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/mwitkow/go-conntrack" "github.com/mwitkow/go-conntrack"
"github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
config_util "github.com/prometheus/common/config" "github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
) )
@ -43,24 +42,12 @@ const (
tritonLabelServerID = tritonLabel + "server_id" tritonLabelServerID = tritonLabel + "server_id"
) )
var ( // DefaultSDConfig is the default Triton SD configuration.
refreshFailuresCount = prometheus.NewCounter( var DefaultSDConfig = SDConfig{
prometheus.CounterOpts{
Name: "prometheus_sd_triton_refresh_failures_total",
Help: "The number of Triton-SD scrape failures.",
})
refreshDuration = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "prometheus_sd_triton_refresh_duration_seconds",
Help: "The duration of a Triton-SD refresh in seconds.",
})
// DefaultSDConfig is the default Triton SD configuration.
DefaultSDConfig = SDConfig{
Port: 9163, Port: 9163,
RefreshInterval: model.Duration(60 * time.Second), RefreshInterval: model.Duration(60 * time.Second),
Version: 1, Version: 1,
} }
)
// SDConfig is the configuration for Triton based service discovery. // SDConfig is the configuration for Triton based service discovery.
type SDConfig struct { type SDConfig struct {
@ -97,13 +84,8 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil return nil
} }
func init() {
prometheus.MustRegister(refreshFailuresCount)
prometheus.MustRegister(refreshDuration)
}
// DiscoveryResponse models a JSON response from the Triton discovery. // DiscoveryResponse models a JSON response from the Triton discovery.
type DiscoveryResponse struct { type discoveryResponse struct {
Containers []struct { Containers []struct {
Groups []string `json:"groups"` Groups []string `json:"groups"`
ServerUUID string `json:"server_uuid"` ServerUUID string `json:"server_uuid"`
@ -117,18 +99,14 @@ type DiscoveryResponse struct {
// Discovery periodically performs Triton-SD requests. It implements // Discovery periodically performs Triton-SD requests. It implements
// the Discoverer interface. // the Discoverer interface.
type Discovery struct { type Discovery struct {
*refresh.Discovery
client *http.Client client *http.Client
interval time.Duration interval time.Duration
logger log.Logger
sdConfig *SDConfig sdConfig *SDConfig
} }
// New returns a new Discovery which periodically refreshes its targets. // New returns a new Discovery which periodically refreshes its targets.
func New(logger log.Logger, conf *SDConfig) (*Discovery, error) { func New(logger log.Logger, conf *SDConfig) (*Discovery, error) {
if logger == nil {
logger = log.NewNopLogger()
}
tls, err := config_util.NewTLSConfig(&conf.TLSConfig) tls, err := config_util.NewTLSConfig(&conf.TLSConfig)
if err != nil { if err != nil {
return nil, err return nil, err
@ -143,60 +121,28 @@ func New(logger log.Logger, conf *SDConfig) (*Discovery, error) {
} }
client := &http.Client{Transport: transport} client := &http.Client{Transport: transport}
return &Discovery{ d := &Discovery{
client: client, client: client,
interval: time.Duration(conf.RefreshInterval), interval: time.Duration(conf.RefreshInterval),
logger: logger,
sdConfig: conf, sdConfig: conf,
}, nil }
d.Discovery = refresh.NewDiscovery(
logger,
"triton",
time.Duration(conf.RefreshInterval),
d.refresh,
)
return d, nil
} }
// Run implements the Discoverer interface. func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer close(ch)
ticker := time.NewTicker(d.interval)
defer ticker.Stop()
// Get an initial set right away.
tg, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Refreshing targets failed", "err", err)
} else {
ch <- []*targetgroup.Group{tg}
}
for {
select {
case <-ticker.C:
tg, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Refreshing targets failed", "err", err)
} else {
ch <- []*targetgroup.Group{tg}
}
case <-ctx.Done():
return
}
}
}
func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err error) {
t0 := time.Now()
defer func() {
refreshDuration.Observe(time.Since(t0).Seconds())
if err != nil {
refreshFailuresCount.Inc()
}
}()
var endpoint = fmt.Sprintf("https://%s:%d/v%d/discover", d.sdConfig.Endpoint, d.sdConfig.Port, d.sdConfig.Version) var endpoint = fmt.Sprintf("https://%s:%d/v%d/discover", d.sdConfig.Endpoint, d.sdConfig.Port, d.sdConfig.Version)
if len(d.sdConfig.Groups) > 0 { if len(d.sdConfig.Groups) > 0 {
groups := url.QueryEscape(strings.Join(d.sdConfig.Groups, ",")) groups := url.QueryEscape(strings.Join(d.sdConfig.Groups, ","))
endpoint = fmt.Sprintf("%s?groups=%s", endpoint, groups) endpoint = fmt.Sprintf("%s?groups=%s", endpoint, groups)
} }
tg = &targetgroup.Group{ tg := &targetgroup.Group{
Source: endpoint, Source: endpoint,
} }
@ -217,7 +163,7 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err
return nil, fmt.Errorf("an error occurred when reading the response body: %s", err) return nil, fmt.Errorf("an error occurred when reading the response body: %s", err)
} }
dr := DiscoveryResponse{} dr := discoveryResponse{}
err = json.Unmarshal(data, &dr) err = json.Unmarshal(data, &dr)
if err != nil { if err != nil {
return nil, fmt.Errorf("an error occurred unmarshaling the discovery response json: %s", err) return nil, fmt.Errorf("an error occurred unmarshaling the discovery response json: %s", err)
@ -242,5 +188,5 @@ func (d *Discovery) refresh(ctx context.Context) (tg *targetgroup.Group, err err
tg.Targets = append(tg.Targets, labels) tg.Targets = append(tg.Targets, labels)
} }
return tg, nil return []*targetgroup.Group{tg}, nil
} }

View file

@ -23,11 +23,10 @@ import (
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
"time"
"github.com/prometheus/common/config" "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -104,30 +103,6 @@ func TestTritonSDNewGroupsConfig(t *testing.T) {
testutil.Equals(t, groupsconf.Port, td.sdConfig.Port) testutil.Equals(t, groupsconf.Port, td.sdConfig.Port)
} }
func TestTritonSDRun(t *testing.T) {
var (
td, _ = newTritonDiscovery(conf)
ch = make(chan []*targetgroup.Group)
ctx, cancel = context.WithCancel(context.Background())
)
wait := make(chan struct{})
go func() {
td.Run(ctx, ch)
close(wait)
}()
select {
case <-time.After(60 * time.Millisecond):
// Expected.
case tgs := <-ch:
t.Fatalf("Unexpected target groups in triton discovery: %s", tgs)
}
cancel()
<-wait
}
func TestTritonSDRefreshNoTargets(t *testing.T) { func TestTritonSDRefreshNoTargets(t *testing.T) {
tgts := testTritonSDRefresh(t, "{\"containers\":[]}") tgts := testTritonSDRefresh(t, "{\"containers\":[]}")
testutil.Assert(t, tgts == nil, "") testutil.Assert(t, tgts == nil, "")
@ -206,8 +181,10 @@ func testTritonSDRefresh(t *testing.T, dstr string) []model.LabelSet {
td.sdConfig.Port = port td.sdConfig.Port = port
tg, err := td.refresh(context.Background()) tgs, err := td.refresh(context.Background())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 1, len(tgs))
tg := tgs[0]
testutil.Assert(t, tg != nil, "") testutil.Assert(t, tg != nil, "")
return tg.Targets return tg.Targets