From 6de80d7fb03f7cd5ed44b85559886e73d89e720d Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Mon, 23 Oct 2023 14:55:36 +0100 Subject: [PATCH] Allow non-default registry to be used for metrics of SD components Signed-off-by: Paulin Todev --- cmd/prometheus/main.go | 28 +++- cmd/promtool/main.go | 3 +- cmd/promtool/sd.go | 5 +- discovery/README.md | 5 + discovery/aws/ec2.go | 16 +- discovery/aws/lightsail.go | 16 +- discovery/azure/azure.go | 63 ++++---- discovery/consul/consul.go | 141 ++++++++++-------- discovery/consul/consul_test.go | 11 +- discovery/digitalocean/digitalocean.go | 16 +- discovery/digitalocean/digitalocean_test.go | 3 +- discovery/discovery.go | 10 ++ discovery/dns/dns.go | 72 ++++----- discovery/dns/dns_test.go | 4 +- discovery/eureka/eureka.go | 16 +- discovery/eureka/eureka_test.go | 3 +- discovery/file/file.go | 75 ++++++---- discovery/file/file_test.go | 9 +- discovery/gce/gce.go | 16 +- discovery/hetzner/hetzner.go | 16 +- discovery/http/http.go | 41 ++--- discovery/http/http_test.go | 23 ++- discovery/ionos/ionos.go | 17 ++- discovery/kubernetes/endpoints.go | 21 +-- discovery/kubernetes/endpointslice.go | 21 +-- discovery/kubernetes/ingress.go | 15 +- discovery/kubernetes/kubernetes.go | 69 +++++---- discovery/kubernetes/kubernetes_test.go | 18 ++- discovery/kubernetes/node.go | 15 +- discovery/kubernetes/pod.go | 15 +- discovery/kubernetes/service.go | 15 +- discovery/legacymanager/manager.go | 68 +++------ discovery/legacymanager/manager_test.go | 27 ++-- discovery/linode/linode.go | 47 +++--- discovery/linode/linode_test.go | 3 +- discovery/manager.go | 66 ++------ discovery/manager_test.go | 39 +++-- discovery/marathon/marathon.go | 16 +- discovery/marathon/marathon_test.go | 5 +- discovery/metrics.go | 103 +++++++++++++ ...lient_metrics.go => metrics_k8s_client.go} | 48 +++--- discovery/moby/docker.go | 16 +- discovery/moby/docker_test.go | 3 +- discovery/moby/dockerswarm.go | 16 +- discovery/moby/nodes_test.go | 3 +- discovery/moby/services_test.go | 5 +- discovery/moby/tasks_test.go | 3 +- discovery/nomad/nomad.go | 53 +++---- discovery/nomad/nomad_test.go | 5 +- discovery/openstack/openstack.go | 16 +- discovery/ovhcloud/ovhcloud.go | 16 +- discovery/ovhcloud/ovhcloud_test.go | 4 +- discovery/puppetdb/puppetdb.go | 16 +- discovery/puppetdb/puppetdb_test.go | 11 +- discovery/refresh/refresh.go | 85 +++++++---- discovery/refresh/refresh_test.go | 11 +- discovery/scaleway/scaleway.go | 16 +- discovery/triton/triton.go | 16 +- discovery/triton/triton_test.go | 3 +- discovery/util.go | 72 +++++++++ discovery/uyuni/uyuni.go | 16 +- discovery/uyuni/uyuni_test.go | 6 +- discovery/vultr/vultr.go | 16 +- discovery/vultr/vultr_test.go | 3 +- discovery/xds/kuma.go | 81 +++++----- discovery/xds/kuma_test.go | 3 +- discovery/xds/xds.go | 13 +- .../examples/custom-sd/adapter-usage/main.go | 10 +- .../examples/custom-sd/adapter/adapter.go | 5 +- .../custom-sd/adapter/adapter_test.go | 6 +- 70 files changed, 1056 insertions(+), 693 deletions(-) create mode 100644 discovery/metrics.go rename discovery/{kubernetes/client_metrics.go => metrics_k8s_client.go} (85%) create mode 100644 discovery/util.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 106f9d05c..24fc0f8e4 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -620,14 +620,30 @@ func main() { discoveryManagerNotify discoveryManager ) + // Register the metrics used by both "scrape" and "notify" discovery managers. + // The same metrics are used for both discovery managers. Hence the registration + // needs to be done here, outside the NewManager() calls, to avoid duplicate + // metric registrations. + discoveryMetrics, err := discovery.NewMetrics(prometheus.DefaultRegisterer) + if err != nil { + level.Error(logger).Log("msg", "failed to create discovery metrics", "err", err) + os.Exit(1) + } if cfg.enableNewSDManager { - discovery.RegisterMetrics() - discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape")) - discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify")) + discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, discoveryMetrics, discovery.Name("scrape")) + discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), prometheus.DefaultRegisterer, discoveryMetrics, discovery.Name("notify")) } else { - legacymanager.RegisterMetrics() - discoveryManagerScrape = legacymanager.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), legacymanager.Name("scrape")) - discoveryManagerNotify = legacymanager.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), legacymanager.Name("notify")) + discoveryManagerScrape = legacymanager.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, discoveryMetrics, legacymanager.Name("scrape")) + discoveryManagerNotify = legacymanager.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), prometheus.DefaultRegisterer, discoveryMetrics, legacymanager.Name("notify")) + } + + if discoveryManagerScrape == nil { + level.Error(logger).Log("msg", "failed to create a discovery manager scrape") + os.Exit(1) + } + if discoveryManagerNotify == nil { + level.Error(logger).Log("msg", "failed to create a discovery manager notify") + os.Exit(1) } scrapeManager, err := scrape.NewManager( diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index f0b2719c9..508b681b8 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -36,6 +36,7 @@ import ( "github.com/google/pprof/profile" "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/testutil/promlint" config_util "github.com/prometheus/common/config" @@ -317,7 +318,7 @@ func main() { switch parsedCmd { case sdCheckCmd.FullCommand(): - os.Exit(CheckSD(*sdConfigFile, *sdJobName, *sdTimeout, noDefaultScrapePort)) + os.Exit(CheckSD(*sdConfigFile, *sdJobName, *sdTimeout, noDefaultScrapePort, prometheus.DefaultRegisterer)) case checkConfigCmd.FullCommand(): os.Exit(CheckConfig(*agentMode, *checkConfigSyntaxOnly, newLintConfig(*checkConfigLint, *checkConfigLintFatal), *configFiles...)) diff --git a/cmd/promtool/sd.go b/cmd/promtool/sd.go index 7c5ae7036..155152e1a 100644 --- a/cmd/promtool/sd.go +++ b/cmd/promtool/sd.go @@ -22,6 +22,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" @@ -37,7 +38,7 @@ type sdCheckResult struct { } // CheckSD performs service discovery for the given job name and reports the results. -func CheckSD(sdConfigFiles, sdJobName string, sdTimeout time.Duration, noDefaultScrapePort bool) int { +func CheckSD(sdConfigFiles, sdJobName string, sdTimeout time.Duration, noDefaultScrapePort bool, registerer prometheus.Registerer) int { logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) cfg, err := config.LoadFile(sdConfigFiles, false, false, logger) @@ -77,7 +78,7 @@ func CheckSD(sdConfigFiles, sdJobName string, sdTimeout time.Duration, noDefault defer cancel() for _, cfg := range scrapeConfig.ServiceDiscoveryConfigs { - d, err := cfg.NewDiscoverer(discovery.DiscovererOptions{Logger: logger}) + d, err := cfg.NewDiscoverer(discovery.DiscovererOptions{Logger: logger, Registerer: registerer}) if err != nil { fmt.Fprintln(os.Stderr, "Could not create new discoverer", err) return failureExitCode diff --git a/discovery/README.md b/discovery/README.md index 19b579b39..4c0660862 100644 --- a/discovery/README.md +++ b/discovery/README.md @@ -234,6 +234,11 @@ type Config interface { type DiscovererOptions struct { Logger log.Logger + + // A registerer for the Discoverer's metrics. + Registerer prometheus.Registerer + + HTTPClientOptions []config.HTTPClientOption } ``` diff --git a/discovery/aws/ec2.go b/discovery/aws/ec2.go index 64c8fdce6..40e6e7cb7 100644 --- a/discovery/aws/ec2.go +++ b/discovery/aws/ec2.go @@ -30,6 +30,7 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -101,7 +102,7 @@ func (*EC2SDConfig) Name() string { return "ec2" } // NewDiscoverer returns a Discoverer for the EC2 Config. func (c *EC2SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewEC2Discovery(c, opts.Logger), nil + return NewEC2Discovery(c, opts.Logger, opts.Registerer), nil } // UnmarshalYAML implements the yaml.Unmarshaler interface for the EC2 Config. @@ -147,7 +148,7 @@ type EC2Discovery struct { } // NewEC2Discovery returns a new EC2Discovery which periodically refreshes its targets. -func NewEC2Discovery(conf *EC2SDConfig, logger log.Logger) *EC2Discovery { +func NewEC2Discovery(conf *EC2SDConfig, logger log.Logger, reg prometheus.Registerer) *EC2Discovery { if logger == nil { logger = log.NewNopLogger() } @@ -156,10 +157,13 @@ func NewEC2Discovery(conf *EC2SDConfig, logger log.Logger) *EC2Discovery { cfg: conf, } d.Discovery = refresh.NewDiscovery( - logger, - "ec2", - time.Duration(d.cfg.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "ec2", + Interval: time.Duration(d.cfg.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d } diff --git a/discovery/aws/lightsail.go b/discovery/aws/lightsail.go index c0198d6a7..5382ea015 100644 --- a/discovery/aws/lightsail.go +++ b/discovery/aws/lightsail.go @@ -29,6 +29,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/lightsail" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -84,7 +85,7 @@ func (*LightsailSDConfig) Name() string { return "lightsail" } // NewDiscoverer returns a Discoverer for the Lightsail Config. func (c *LightsailSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewLightsailDiscovery(c, opts.Logger), nil + return NewLightsailDiscovery(c, opts.Logger, opts.Registerer), nil } // UnmarshalYAML implements the yaml.Unmarshaler interface for the Lightsail Config. @@ -121,7 +122,7 @@ type LightsailDiscovery struct { } // NewLightsailDiscovery returns a new LightsailDiscovery which periodically refreshes its targets. -func NewLightsailDiscovery(conf *LightsailSDConfig, logger log.Logger) *LightsailDiscovery { +func NewLightsailDiscovery(conf *LightsailSDConfig, logger log.Logger, reg prometheus.Registerer) *LightsailDiscovery { if logger == nil { logger = log.NewNopLogger() } @@ -129,10 +130,13 @@ func NewLightsailDiscovery(conf *LightsailSDConfig, logger log.Logger) *Lightsai cfg: conf, } d.Discovery = refresh.NewDiscovery( - logger, - "lightsail", - time.Duration(d.cfg.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "lightsail", + Interval: time.Duration(d.cfg.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d } diff --git a/discovery/azure/azure.go b/discovery/azure/azure.go index ef953b802..3b67a8102 100644 --- a/discovery/azure/azure.go +++ b/discovery/azure/azure.go @@ -79,17 +79,6 @@ var ( AuthenticationMethod: authMethodOAuth, HTTPClientConfig: config_util.DefaultHTTPClientConfig, } - - failuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_azure_failures_total", - Help: "Number of Azure service discovery refresh failures.", - }) - cacheHitCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_azure_cache_hit_total", - Help: "Number of cache hit during refresh.", - }) ) var environments = map[string]cloud.Configuration{ @@ -114,8 +103,6 @@ func CloudConfigurationFromName(name string) (cloud.Configuration, error) { func init() { discovery.RegisterConfig(&SDConfig{}) - prometheus.MustRegister(failuresCount) - prometheus.MustRegister(cacheHitCount) } // SDConfig is the configuration for Azure based service discovery. @@ -138,7 +125,7 @@ func (*SDConfig) Name() string { return "azure" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger), nil + return NewDiscovery(c, opts.Logger, opts.Registerer) } func validateAuthParam(param, name string) error { @@ -181,14 +168,16 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { type Discovery struct { *refresh.Discovery - logger log.Logger - cfg *SDConfig - port int - cache *cache.Cache[string, *armnetwork.Interface] + logger log.Logger + cfg *SDConfig + port int + cache *cache.Cache[string, *armnetwork.Interface] + failuresCount prometheus.Counter + cacheHitCount prometheus.Counter } // NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets. -func NewDiscovery(cfg *SDConfig, logger log.Logger) *Discovery { +func NewDiscovery(cfg *SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { if logger == nil { logger = log.NewNopLogger() } @@ -198,16 +187,30 @@ func NewDiscovery(cfg *SDConfig, logger log.Logger) *Discovery { port: cfg.Port, logger: logger, cache: l, + failuresCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_sd_azure_failures_total", + Help: "Number of Azure service discovery refresh failures.", + }), + cacheHitCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_sd_azure_cache_hit_total", + Help: "Number of cache hit during refresh.", + }), } d.Discovery = refresh.NewDiscovery( - logger, - "azure", - time.Duration(cfg.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "azure", + Interval: time.Duration(cfg.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + Metrics: []prometheus.Collector{d.failuresCount, d.cacheHitCount}, + }, ) - return d + return d, nil } // azureClient represents multiple Azure Resource Manager providers. @@ -329,14 +332,14 @@ func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { client, err := createAzureClient(*d.cfg) if err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, fmt.Errorf("could not create Azure client: %w", err) } client.logger = d.logger machines, err := client.getVMs(ctx, d.cfg.ResourceGroup) if err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, fmt.Errorf("could not get virtual machines: %w", err) } @@ -345,14 +348,14 @@ func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { // Load the vms managed by scale sets. scaleSets, err := client.getScaleSets(ctx, d.cfg.ResourceGroup) if err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, fmt.Errorf("could not get virtual machine scale sets: %w", err) } for _, scaleSet := range scaleSets { scaleSetVms, err := client.getScaleSetVMs(ctx, scaleSet) if err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, fmt.Errorf("could not get virtual machine scale set vms: %w", err) } machines = append(machines, scaleSetVms...) @@ -403,7 +406,7 @@ func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { var networkInterface *armnetwork.Interface if v, ok := d.getFromCache(nicID); ok { networkInterface = v - cacheHitCount.Add(1) + d.cacheHitCount.Add(1) } else { networkInterface, err = client.getNetworkInterfaceByID(ctx, nicID) if err != nil { @@ -462,7 +465,7 @@ func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { var tg targetgroup.Group for tgt := range ch { if tgt.err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, fmt.Errorf("unable to complete Azure service discovery: %w", tgt.err) } if tgt.labelSet != nil { diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go index b4cb15229..50f171a78 100644 --- a/discovery/consul/consul.go +++ b/discovery/consul/consul.go @@ -71,41 +71,18 @@ const ( namespace = "prometheus" ) -var ( - rpcFailuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_consul_rpc_failures_total", - Help: "The number of Consul RPC call failures.", - }) - rpcDuration = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_consul_rpc_duration_seconds", - Help: "The duration of a Consul RPC call in seconds.", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, - []string{"endpoint", "call"}, - ) - - // Initialize metric vectors. - servicesRPCDuration = rpcDuration.WithLabelValues("catalog", "services") - serviceRPCDuration = rpcDuration.WithLabelValues("catalog", "service") - - // DefaultSDConfig is the default Consul SD configuration. - DefaultSDConfig = SDConfig{ - TagSeparator: ",", - Scheme: "http", - Server: "localhost:8500", - AllowStale: true, - RefreshInterval: model.Duration(30 * time.Second), - HTTPClientConfig: config.DefaultHTTPClientConfig, - } -) +// DefaultSDConfig is the default Consul SD configuration. +var DefaultSDConfig = SDConfig{ + TagSeparator: ",", + Scheme: "http", + Server: "localhost:8500", + AllowStale: true, + RefreshInterval: model.Duration(30 * time.Second), + HTTPClientConfig: config.DefaultHTTPClientConfig, +} func init() { discovery.RegisterConfig(&SDConfig{}) - prometheus.MustRegister(rpcFailuresCount, rpcDuration) } // SDConfig is the configuration for Consul service discovery. @@ -147,7 +124,7 @@ func (*SDConfig) Name() string { return "consul" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -184,22 +161,27 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { // Discovery retrieves target information from a Consul server // and updates them via watches. type Discovery struct { - client *consul.Client - clientDatacenter string - clientNamespace string - clientPartition string - tagSeparator string - watchedServices []string // Set of services which will be discovered. - watchedTags []string // Tags used to filter instances of a service. - watchedNodeMeta map[string]string - allowStale bool - refreshInterval time.Duration - finalizer func() - logger log.Logger + client *consul.Client + clientDatacenter string + clientNamespace string + clientPartition string + tagSeparator string + watchedServices []string // Set of services which will be discovered. + watchedTags []string // Tags used to filter instances of a service. + watchedNodeMeta map[string]string + allowStale bool + refreshInterval time.Duration + finalizer func() + logger log.Logger + rpcFailuresCount prometheus.Counter + rpcDuration *prometheus.SummaryVec + servicesRPCDuration prometheus.Observer + serviceRPCDuration prometheus.Observer + metricRegisterer discovery.MetricRegisterer } // NewDiscovery returns a new Discovery for the given config. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { if logger == nil { logger = log.NewNopLogger() } @@ -237,7 +219,35 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { clientPartition: conf.Partition, finalizer: wrapper.CloseIdleConnections, logger: logger, + rpcFailuresCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "sd_consul_rpc_failures_total", + Help: "The number of Consul RPC call failures.", + }), + rpcDuration: prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "sd_consul_rpc_duration_seconds", + Help: "The duration of a Consul RPC call in seconds.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{"endpoint", "call"}, + ), } + + cd.metricRegisterer = discovery.NewMetricRegisterer( + reg, + []prometheus.Collector{ + cd.rpcFailuresCount, + cd.rpcDuration, + }, + ) + + // Initialize metric vectors. + cd.servicesRPCDuration = cd.rpcDuration.WithLabelValues("catalog", "services") + cd.serviceRPCDuration = cd.rpcDuration.WithLabelValues("catalog", "service") + return cd, nil } @@ -293,7 +303,7 @@ func (d *Discovery) getDatacenter() error { info, err := d.client.Agent().Self() if err != nil { level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err) - rpcFailuresCount.Inc() + d.rpcFailuresCount.Inc() return err } @@ -334,6 +344,13 @@ func (d *Discovery) initialize(ctx context.Context) { // Run implements the Discoverer interface. func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + err := d.metricRegisterer.RegisterMetrics() + if err != nil { + level.Error(d.logger).Log("msg", "Unable to register metrics", "err", err.Error()) + return + } + defer d.metricRegisterer.UnregisterMetrics() + if d.finalizer != nil { defer d.finalizer() } @@ -382,7 +399,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup. t0 := time.Now() srvs, meta, err := catalog.Services(opts.WithContext(ctx)) elapsed := time.Since(t0) - servicesRPCDuration.Observe(elapsed.Seconds()) + d.servicesRPCDuration.Observe(elapsed.Seconds()) // Check the context before in order to exit early. select { @@ -393,7 +410,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup. if err != nil { level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err) - rpcFailuresCount.Inc() + d.rpcFailuresCount.Inc() time.Sleep(retryInterval) return } @@ -449,13 +466,15 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup. // consulService contains data belonging to the same service. type consulService struct { - name string - tags []string - labels model.LabelSet - discovery *Discovery - client *consul.Client - tagSeparator string - logger log.Logger + name string + tags []string + labels model.LabelSet + discovery *Discovery + client *consul.Client + tagSeparator string + logger log.Logger + rpcFailuresCount prometheus.Counter + serviceRPCDuration prometheus.Observer } // Start watching a service. @@ -469,8 +488,10 @@ func (d *Discovery) watchService(ctx context.Context, ch chan<- []*targetgroup.G serviceLabel: model.LabelValue(name), datacenterLabel: model.LabelValue(d.clientDatacenter), }, - tagSeparator: d.tagSeparator, - logger: d.logger, + tagSeparator: d.tagSeparator, + logger: d.logger, + rpcFailuresCount: d.rpcFailuresCount, + serviceRPCDuration: d.serviceRPCDuration, } go func() { @@ -508,7 +529,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr t0 := time.Now() serviceNodes, meta, err := health.ServiceMultipleTags(srv.name, srv.tags, false, opts.WithContext(ctx)) elapsed := time.Since(t0) - serviceRPCDuration.Observe(elapsed.Seconds()) + srv.serviceRPCDuration.Observe(elapsed.Seconds()) // Check the context before in order to exit early. select { @@ -520,7 +541,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr if err != nil { level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "tags", strings.Join(srv.tags, ","), "err", err) - rpcFailuresCount.Inc() + srv.rpcFailuresCount.Inc() time.Sleep(retryInterval) return } diff --git a/discovery/consul/consul_test.go b/discovery/consul/consul_test.go index 19f7d3c4a..97cb8fbc9 100644 --- a/discovery/consul/consul_test.go +++ b/discovery/consul/consul_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -39,7 +40,7 @@ func TestConfiguredService(t *testing.T) { conf := &SDConfig{ Services: []string{"configuredServiceName"}, } - consulDiscovery, err := NewDiscovery(conf, nil) + consulDiscovery, err := NewDiscovery(conf, nil, prometheus.NewRegistry()) if err != nil { t.Errorf("Unexpected error when initializing discovery %v", err) } @@ -56,7 +57,7 @@ func TestConfiguredServiceWithTag(t *testing.T) { Services: []string{"configuredServiceName"}, ServiceTags: []string{"http"}, } - consulDiscovery, err := NewDiscovery(conf, nil) + consulDiscovery, err := NewDiscovery(conf, nil, prometheus.NewRegistry()) if err != nil { t.Errorf("Unexpected error when initializing discovery %v", err) } @@ -151,7 +152,7 @@ func TestConfiguredServiceWithTags(t *testing.T) { } for _, tc := range cases { - consulDiscovery, err := NewDiscovery(tc.conf, nil) + consulDiscovery, err := NewDiscovery(tc.conf, nil, prometheus.NewRegistry()) if err != nil { t.Errorf("Unexpected error when initializing discovery %v", err) } @@ -165,7 +166,7 @@ func TestConfiguredServiceWithTags(t *testing.T) { func TestNonConfiguredService(t *testing.T) { conf := &SDConfig{} - consulDiscovery, err := NewDiscovery(conf, nil) + consulDiscovery, err := NewDiscovery(conf, nil, prometheus.NewRegistry()) if err != nil { t.Errorf("Unexpected error when initializing discovery %v", err) } @@ -262,7 +263,7 @@ func newServer(t *testing.T) (*httptest.Server, *SDConfig) { func newDiscovery(t *testing.T, config *SDConfig) *Discovery { logger := log.NewNopLogger() - d, err := NewDiscovery(config, logger) + d, err := NewDiscovery(config, logger, prometheus.NewRegistry()) require.NoError(t, err) return d } diff --git a/discovery/digitalocean/digitalocean.go b/discovery/digitalocean/digitalocean.go index e207388b3..970258de0 100644 --- a/discovery/digitalocean/digitalocean.go +++ b/discovery/digitalocean/digitalocean.go @@ -24,6 +24,7 @@ import ( "github.com/digitalocean/godo" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/version" @@ -75,7 +76,7 @@ func (*SDConfig) Name() string { return "digitalocean" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -103,7 +104,7 @@ type Discovery struct { } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { d := &Discovery{ port: conf.Port, } @@ -125,10 +126,13 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { } d.Discovery = refresh.NewDiscovery( - logger, - "digitalocean", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "digitalocean", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d, nil } diff --git a/discovery/digitalocean/digitalocean_test.go b/discovery/digitalocean/digitalocean_test.go index df2514ecb..a959b312c 100644 --- a/discovery/digitalocean/digitalocean_test.go +++ b/discovery/digitalocean/digitalocean_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) @@ -46,7 +47,7 @@ func TestDigitalOceanSDRefresh(t *testing.T) { cfg := DefaultSDConfig cfg.HTTPClientConfig.BearerToken = tokenID - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) endpoint, err := url.Parse(sdmock.Mock.Endpoint()) require.NoError(t, err) diff --git a/discovery/discovery.go b/discovery/discovery.go index 9dc010a09..acc4c1efe 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -18,6 +18,7 @@ import ( "reflect" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -42,6 +43,15 @@ type Discoverer interface { type DiscovererOptions struct { Logger log.Logger + // A registerer for the Discoverer's metrics. + // Some Discoverers may ignore this registerer and use the global one instead. + // For now this will work, because the Prometheus `main` function uses the global registry. + // However, in the future the Prometheus `main` function will be updated to not use the global registry. + // Hence, if a discoverer wants its metrics to be visible via the Prometheus executable's + // `/metrics` endpoint, it should use this explicit registerer. + // TODO(ptodev): Update this comment once the Prometheus `main` function does not use the global registry. + Registerer prometheus.Registerer + // Extra HTTP client options to expose to Discoverers. This field may be // ignored; Discoverer implementations must opt-in to reading it. HTTPClientOptions []config.HTTPClientOption diff --git a/discovery/dns/dns.go b/discovery/dns/dns.go index 4838a8954..9b6bd6741 100644 --- a/discovery/dns/dns.go +++ b/discovery/dns/dns.go @@ -49,30 +49,14 @@ const ( namespace = "prometheus" ) -var ( - dnsSDLookupsCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_dns_lookups_total", - Help: "The number of DNS-SD lookups.", - }) - dnsSDLookupFailuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_dns_lookup_failures_total", - Help: "The number of DNS-SD lookup failures.", - }) - - // DefaultSDConfig is the default DNS SD configuration. - DefaultSDConfig = SDConfig{ - RefreshInterval: model.Duration(30 * time.Second), - Type: "SRV", - } -) +// DefaultSDConfig is the default DNS SD configuration. +var DefaultSDConfig = SDConfig{ + RefreshInterval: model.Duration(30 * time.Second), + Type: "SRV", +} func init() { discovery.RegisterConfig(&SDConfig{}) - prometheus.MustRegister(dnsSDLookupFailuresCount, dnsSDLookupsCount) } // SDConfig is the configuration for DNS based service discovery. @@ -88,7 +72,7 @@ func (*SDConfig) Name() string { return "dns" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(*c, opts.Logger), nil + return NewDiscovery(*c, opts.Logger, opts.Registerer) } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -118,16 +102,18 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { // the Discoverer interface. type Discovery struct { *refresh.Discovery - names []string - port int - qtype uint16 - logger log.Logger + names []string + port int + qtype uint16 + logger log.Logger + dnsSDLookupsCount prometheus.Counter + dnsSDLookupFailuresCount prometheus.Counter lookupFn func(name string, qtype uint16, logger log.Logger) (*dns.Msg, error) } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf SDConfig, logger log.Logger) *Discovery { +func NewDiscovery(conf SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { if logger == nil { logger = log.NewNopLogger() } @@ -151,14 +137,32 @@ func NewDiscovery(conf SDConfig, logger log.Logger) *Discovery { port: conf.Port, logger: logger, lookupFn: lookupWithSearchPath, + dnsSDLookupsCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "sd_dns_lookups_total", + Help: "The number of DNS-SD lookups.", + }), + dnsSDLookupFailuresCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "sd_dns_lookup_failures_total", + Help: "The number of DNS-SD lookup failures.", + }), } + d.Discovery = refresh.NewDiscovery( - logger, - "dns", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "dns", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: prometheus.NewRegistry(), + Metrics: []prometheus.Collector{d.dnsSDLookupsCount, d.dnsSDLookupFailuresCount}, + }, ) - return d + + return d, nil } func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { @@ -191,9 +195,9 @@ func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { func (d *Discovery) refreshOne(ctx context.Context, name string, ch chan<- *targetgroup.Group) error { response, err := d.lookupFn(name, d.qtype, d.logger) - dnsSDLookupsCount.Inc() + d.dnsSDLookupsCount.Inc() if err != nil { - dnsSDLookupFailuresCount.Inc() + d.dnsSDLookupFailuresCount.Inc() return err } diff --git a/discovery/dns/dns_test.go b/discovery/dns/dns_test.go index 52ca72c79..b8dd2efaa 100644 --- a/discovery/dns/dns_test.go +++ b/discovery/dns/dns_test.go @@ -22,6 +22,7 @@ import ( "github.com/go-kit/log" "github.com/miekg/dns" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -252,7 +253,8 @@ func TestDNS(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() - sd := NewDiscovery(tc.config, nil) + sd, err := NewDiscovery(tc.config, nil, prometheus.NewRegistry()) + require.NoError(t, err) sd.lookupFn = tc.lookup tgs, err := sd.refresh(context.Background()) diff --git a/discovery/eureka/eureka.go b/discovery/eureka/eureka.go index 5d9d8d552..d3e4084e5 100644 --- a/discovery/eureka/eureka.go +++ b/discovery/eureka/eureka.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -80,7 +81,7 @@ func (*SDConfig) Name() string { return "eureka" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -117,7 +118,7 @@ type Discovery struct { } // NewDiscovery creates a new Eureka discovery for the given role. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { rt, err := config.NewRoundTripperFromConfig(conf.HTTPClientConfig, "eureka_sd") if err != nil { return nil, err @@ -128,10 +129,13 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { server: conf.Server, } d.Discovery = refresh.NewDiscovery( - logger, - "eureka", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "eureka", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d, nil } diff --git a/discovery/eureka/eureka_test.go b/discovery/eureka/eureka_test.go index cb75e1428..1fe3c710e 100644 --- a/discovery/eureka/eureka_test.go +++ b/discovery/eureka/eureka_test.go @@ -20,6 +20,7 @@ import ( "net/http/httptest" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -35,7 +36,7 @@ func testUpdateServices(respHandler http.HandlerFunc) ([]*targetgroup.Group, err Server: ts.URL, } - md, err := NewDiscovery(&conf, nil) + md, err := NewDiscovery(&conf, nil, prometheus.NewRegistry()) if err != nil { return nil, err } diff --git a/discovery/file/file.go b/discovery/file/file.go index 60b63350f..ef6ed1f5e 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -39,24 +39,6 @@ import ( ) var ( - fileSDReadErrorsCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_file_read_errors_total", - Help: "The number of File-SD read errors.", - }) - fileSDScanDuration = prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "prometheus_sd_file_scan_duration_seconds", - Help: "The duration of the File-SD scan in seconds.", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }) - fileSDTimeStamp = NewTimestampCollector() - fileWatcherErrorsCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_file_watcher_errors_total", - Help: "The number of File-SD errors caused by filesystem watch failures.", - }) - patFileSDName = regexp.MustCompile(`^[^*]*(\*[^/]*)?\.(json|yml|yaml|JSON|YML|YAML)$`) // DefaultSDConfig is the default file SD configuration. @@ -67,7 +49,6 @@ var ( func init() { discovery.RegisterConfig(&SDConfig{}) - prometheus.MustRegister(fileSDReadErrorsCount, fileSDScanDuration, fileSDTimeStamp, fileWatcherErrorsCount) } // SDConfig is the configuration for file based discovery. @@ -81,7 +62,7 @@ func (*SDConfig) Name() string { return "file" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger), nil + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -187,10 +168,17 @@ type Discovery struct { // This is used to detect deleted target groups. lastRefresh map[string]int logger log.Logger + + fileSDReadErrorsCount prometheus.Counter + fileSDScanDuration prometheus.Summary + fileWatcherErrorsCount prometheus.Counter + fileSDTimeStamp *TimestampCollector + + metricRegisterer discovery.MetricRegisterer } // NewDiscovery returns a new file discovery for the given paths. -func NewDiscovery(conf *SDConfig, logger log.Logger) *Discovery { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { if logger == nil { logger = log.NewNopLogger() } @@ -200,9 +188,35 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) *Discovery { interval: time.Duration(conf.RefreshInterval), timestamps: make(map[string]float64), logger: logger, + fileSDReadErrorsCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_sd_file_read_errors_total", + Help: "The number of File-SD read errors.", + }), + fileSDScanDuration: prometheus.NewSummary( + prometheus.SummaryOpts{ + Name: "prometheus_sd_file_scan_duration_seconds", + Help: "The duration of the File-SD scan in seconds.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }), + fileWatcherErrorsCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_sd_file_watcher_errors_total", + Help: "The number of File-SD errors caused by filesystem watch failures.", + }), + fileSDTimeStamp: NewTimestampCollector(), } - fileSDTimeStamp.addDiscoverer(disc) - return disc + + disc.fileSDTimeStamp.addDiscoverer(disc) + + disc.metricRegisterer = discovery.NewMetricRegisterer(reg, []prometheus.Collector{ + disc.fileSDReadErrorsCount, + disc.fileSDScanDuration, + disc.fileWatcherErrorsCount, + disc.fileSDTimeStamp, + }) + + return disc, nil } // listFiles returns a list of all files that match the configured patterns. @@ -239,10 +253,17 @@ func (d *Discovery) watchFiles() { // Run implements the Discoverer interface. func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + err := d.metricRegisterer.RegisterMetrics() + if err != nil { + level.Error(d.logger).Log("msg", "Unable to register metrics", "err", err.Error()) + return + } + defer d.metricRegisterer.UnregisterMetrics() + watcher, err := fsnotify.NewWatcher() if err != nil { level.Error(d.logger).Log("msg", "Error adding file watcher", "err", err) - fileWatcherErrorsCount.Inc() + d.fileWatcherErrorsCount.Inc() return } d.watcher = watcher @@ -306,7 +327,7 @@ func (d *Discovery) stop() { done := make(chan struct{}) defer close(done) - fileSDTimeStamp.removeDiscoverer(d) + d.fileSDTimeStamp.removeDiscoverer(d) // Closing the watcher will deadlock unless all events and errors are drained. go func() { @@ -332,13 +353,13 @@ func (d *Discovery) stop() { func (d *Discovery) refresh(ctx context.Context, ch chan<- []*targetgroup.Group) { t0 := time.Now() defer func() { - fileSDScanDuration.Observe(time.Since(t0).Seconds()) + d.fileSDScanDuration.Observe(time.Since(t0).Seconds()) }() ref := map[string]int{} for _, p := range d.listFiles() { tgroups, err := d.readFile(p) if err != nil { - fileSDReadErrorsCount.Inc() + d.fileSDReadErrorsCount.Inc() level.Error(d.logger).Log("msg", "Error reading file", "path", p, "err", err) // Prevent deletion down below. diff --git a/discovery/file/file_test.go b/discovery/file/file_test.go index 76e1cebed..c138fc8a9 100644 --- a/discovery/file/file_test.go +++ b/discovery/file/file_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -143,7 +144,7 @@ func (t *testRunner) run(files ...string) { ctx, cancel := context.WithCancel(context.Background()) t.cancelSD = cancel go func() { - NewDiscovery( + d, err := NewDiscovery( &SDConfig{ Files: files, // Setting a high refresh interval to make sure that the tests only @@ -151,7 +152,11 @@ func (t *testRunner) run(files ...string) { RefreshInterval: model.Duration(1 * time.Hour), }, nil, - ).Run(ctx, t.ch) + prometheus.NewRegistry(), + ) + require.NoError(t, err) + + d.Run(ctx, t.ch) }() } diff --git a/discovery/gce/gce.go b/discovery/gce/gce.go index fa05fbbf3..21a95ee39 100644 --- a/discovery/gce/gce.go +++ b/discovery/gce/gce.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "golang.org/x/oauth2/google" "google.golang.org/api/compute/v1" @@ -86,7 +87,7 @@ func (*SDConfig) Name() string { return "gce" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(*c, opts.Logger) + return NewDiscovery(*c, opts.Logger, opts.Registerer) } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -121,7 +122,7 @@ type Discovery struct { } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { d := &Discovery{ project: conf.Project, zone: conf.Zone, @@ -141,10 +142,13 @@ func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) { d.isvc = compute.NewInstancesService(d.svc) d.Discovery = refresh.NewDiscovery( - logger, - "gce", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "gce", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d, nil } diff --git a/discovery/hetzner/hetzner.go b/discovery/hetzner/hetzner.go index c3f7ec39c..9d3e6aa65 100644 --- a/discovery/hetzner/hetzner.go +++ b/discovery/hetzner/hetzner.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/log" "github.com/hetznercloud/hcloud-go/v2/hcloud" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -67,7 +68,7 @@ func (*SDConfig) Name() string { return "hetzner" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } type refresher interface { @@ -127,17 +128,20 @@ type Discovery struct { } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*refresh.Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*refresh.Discovery, error) { r, err := newRefresher(conf, logger) if err != nil { return nil, err } return refresh.NewDiscovery( - logger, - "hetzner", - time.Duration(conf.RefreshInterval), - r.refresh, + refresh.Options{ + Logger: logger, + Mech: "hetzner", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: r.refresh, + Registry: reg, + }, ), nil } diff --git a/discovery/http/http.go b/discovery/http/http.go index 2980d7efd..c12fdb26d 100644 --- a/discovery/http/http.go +++ b/discovery/http/http.go @@ -45,17 +45,10 @@ var ( } userAgent = fmt.Sprintf("Prometheus/%s", version.Version) matchContentType = regexp.MustCompile(`^(?i:application\/json(;\s*charset=("utf-8"|utf-8))?)$`) - - failuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_http_failures_total", - Help: "Number of HTTP service discovery refresh failures.", - }) ) func init() { discovery.RegisterConfig(&SDConfig{}) - prometheus.MustRegister(failuresCount) } // SDConfig is the configuration for HTTP based discovery. @@ -70,7 +63,7 @@ func (*SDConfig) Name() string { return "http" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger, opts.HTTPClientOptions) + return NewDiscovery(c, opts.Logger, opts.HTTPClientOptions, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -112,10 +105,11 @@ type Discovery struct { client *http.Client refreshInterval time.Duration tgLastLength int + failuresCount prometheus.Counter } // NewDiscovery returns a new HTTP discovery for the given config. -func NewDiscovery(conf *SDConfig, logger log.Logger, clientOpts []config.HTTPClientOption) (*Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, clientOpts []config.HTTPClientOption, reg prometheus.Registerer) (*Discovery, error) { if logger == nil { logger = log.NewNopLogger() } @@ -130,13 +124,22 @@ func NewDiscovery(conf *SDConfig, logger log.Logger, clientOpts []config.HTTPCli url: conf.URL, client: client, refreshInterval: time.Duration(conf.RefreshInterval), // Stored to be sent as headers. + failuresCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_sd_http_failures_total", + Help: "Number of HTTP service discovery refresh failures.", + }), } d.Discovery = refresh.NewDiscovery( - logger, - "http", - time.Duration(conf.RefreshInterval), - d.Refresh, + refresh.Options{ + Logger: logger, + Mech: "http", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.Refresh, + Registry: reg, + Metrics: []prometheus.Collector{d.failuresCount}, + }, ) return d, nil } @@ -152,7 +155,7 @@ func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) { resp, err := d.client.Do(req.WithContext(ctx)) if err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, err } defer func() { @@ -161,31 +164,31 @@ func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) { }() if resp.StatusCode != http.StatusOK { - failuresCount.Inc() + d.failuresCount.Inc() return nil, fmt.Errorf("server returned HTTP status %s", resp.Status) } if !matchContentType.MatchString(strings.TrimSpace(resp.Header.Get("Content-Type"))) { - failuresCount.Inc() + d.failuresCount.Inc() return nil, fmt.Errorf("unsupported content type %q", resp.Header.Get("Content-Type")) } b, err := io.ReadAll(resp.Body) if err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, err } var targetGroups []*targetgroup.Group if err := json.Unmarshal(b, &targetGroups); err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, err } for i, tg := range targetGroups { if tg == nil { - failuresCount.Inc() + d.failuresCount.Inc() err = errors.New("nil target group item found") return nil, err } diff --git a/discovery/http/http_test.go b/discovery/http/http_test.go index 9bbda95b7..164719e90 100644 --- a/discovery/http/http_test.go +++ b/discovery/http/http_test.go @@ -41,7 +41,7 @@ func TestHTTPValidRefresh(t *testing.T) { RefreshInterval: model.Duration(30 * time.Second), } - d, err := NewDiscovery(&cfg, log.NewNopLogger(), nil) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), nil, prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() @@ -63,7 +63,7 @@ func TestHTTPValidRefresh(t *testing.T) { }, } require.Equal(t, expectedTargets, tgs) - require.Equal(t, 0.0, getFailureCount()) + require.Equal(t, 0.0, getFailureCount(d.failuresCount)) } func TestHTTPInvalidCode(t *testing.T) { @@ -79,13 +79,13 @@ func TestHTTPInvalidCode(t *testing.T) { RefreshInterval: model.Duration(30 * time.Second), } - d, err := NewDiscovery(&cfg, log.NewNopLogger(), nil) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), nil, prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() _, err = d.Refresh(ctx) require.EqualError(t, err, "server returned HTTP status 400 Bad Request") - require.Equal(t, 1.0, getFailureCount()) + require.Equal(t, 1.0, getFailureCount(d.failuresCount)) } func TestHTTPInvalidFormat(t *testing.T) { @@ -101,18 +101,16 @@ func TestHTTPInvalidFormat(t *testing.T) { RefreshInterval: model.Duration(30 * time.Second), } - d, err := NewDiscovery(&cfg, log.NewNopLogger(), nil) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), nil, prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() _, err = d.Refresh(ctx) require.EqualError(t, err, `unsupported content type "text/plain; charset=utf-8"`) - require.Equal(t, 1.0, getFailureCount()) + require.Equal(t, 1.0, getFailureCount(d.failuresCount)) } -var lastFailureCount float64 - -func getFailureCount() float64 { +func getFailureCount(failuresCount prometheus.Counter) float64 { failureChan := make(chan prometheus.Metric) go func() { @@ -129,10 +127,7 @@ func getFailureCount() float64 { metric.Write(&counter) } - // account for failures in prior tests - count := *counter.Counter.Value - lastFailureCount - lastFailureCount = *counter.Counter.Value - return count + return *counter.Counter.Value } func TestContentTypeRegex(t *testing.T) { @@ -417,7 +412,7 @@ func TestSourceDisappeared(t *testing.T) { URL: ts.URL, RefreshInterval: model.Duration(1 * time.Second), } - d, err := NewDiscovery(&cfg, log.NewNopLogger(), nil) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), nil, prometheus.NewRegistry()) require.NoError(t, err) for _, test := range cases { ctx := context.Background() diff --git a/discovery/ionos/ionos.go b/discovery/ionos/ionos.go index 3afed8d79..36623745a 100644 --- a/discovery/ionos/ionos.go +++ b/discovery/ionos/ionos.go @@ -23,6 +23,8 @@ import ( "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/refresh" + + "github.com/prometheus/client_golang/prometheus" ) const ( @@ -41,7 +43,7 @@ func init() { type Discovery struct{} // NewDiscovery returns a new refresh.Discovery for IONOS Cloud. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*refresh.Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*refresh.Discovery, error) { if conf.ionosEndpoint == "" { conf.ionosEndpoint = "https://api.ionos.com" } @@ -52,10 +54,13 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*refresh.Discovery, error) } return refresh.NewDiscovery( - logger, - "ionos", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "ionos", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ), nil } @@ -86,7 +91,7 @@ func (c SDConfig) Name() string { // NewDiscoverer returns a new discovery.Discoverer for IONOS Cloud. func (c SDConfig) NewDiscoverer(options discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(&c, options.Logger) + return NewDiscovery(&c, options.Logger, options.Registerer) } // UnmarshalYAML implements the yaml.Unmarshaler interface. diff --git a/discovery/kubernetes/endpoints.go b/discovery/kubernetes/endpoints.go index 708e229a2..801a45f7c 100644 --- a/discovery/kubernetes/endpoints.go +++ b/discovery/kubernetes/endpoints.go @@ -22,6 +22,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" apiv1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" @@ -30,12 +31,6 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" ) -var ( - epAddCount = eventCount.WithLabelValues("endpoints", "add") - epUpdateCount = eventCount.WithLabelValues("endpoints", "update") - epDeleteCount = eventCount.WithLabelValues("endpoints", "delete") -) - // Endpoints discovers new endpoint targets. type Endpoints struct { logger log.Logger @@ -54,7 +49,7 @@ type Endpoints struct { } // NewEndpoints returns a new endpoints discovery. -func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *Endpoints { +func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer, eventCount *prometheus.CounterVec) *Endpoints { if l == nil { l = log.NewNopLogger() } @@ -73,15 +68,15 @@ func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node ca _, err := e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - epAddCount.Inc() + eventCount.WithLabelValues("endpoints", "add").Inc() e.enqueue(o) }, UpdateFunc: func(_, o interface{}) { - epUpdateCount.Inc() + eventCount.WithLabelValues("endpoints", "update").Inc() e.enqueue(o) }, DeleteFunc: func(o interface{}) { - epDeleteCount.Inc() + eventCount.WithLabelValues("endpoints", "delete").Inc() e.enqueue(o) }, }) @@ -112,15 +107,15 @@ func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node ca // TODO(fabxc): potentially remove add and delete event handlers. Those should // be triggered via the endpoint handlers already. AddFunc: func(o interface{}) { - svcAddCount.Inc() + eventCount.WithLabelValues("service", "add").Inc() serviceUpdate(o) }, UpdateFunc: func(_, o interface{}) { - svcUpdateCount.Inc() + eventCount.WithLabelValues("service", "update").Inc() serviceUpdate(o) }, DeleteFunc: func(o interface{}) { - svcDeleteCount.Inc() + eventCount.WithLabelValues("service", "delete").Inc() serviceUpdate(o) }, }) diff --git a/discovery/kubernetes/endpointslice.go b/discovery/kubernetes/endpointslice.go index a16862380..e2ac1de42 100644 --- a/discovery/kubernetes/endpointslice.go +++ b/discovery/kubernetes/endpointslice.go @@ -22,6 +22,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" apiv1 "k8s.io/api/core/v1" v1 "k8s.io/api/discovery/v1" @@ -33,12 +34,6 @@ import ( "github.com/prometheus/prometheus/util/strutil" ) -var ( - epslAddCount = eventCount.WithLabelValues("endpointslice", "add") - epslUpdateCount = eventCount.WithLabelValues("endpointslice", "update") - epslDeleteCount = eventCount.WithLabelValues("endpointslice", "delete") -) - // EndpointSlice discovers new endpoint targets. type EndpointSlice struct { logger log.Logger @@ -57,7 +52,7 @@ type EndpointSlice struct { } // NewEndpointSlice returns a new endpointslice discovery. -func NewEndpointSlice(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *EndpointSlice { +func NewEndpointSlice(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer, eventCount *prometheus.CounterVec) *EndpointSlice { if l == nil { l = log.NewNopLogger() } @@ -76,15 +71,15 @@ func NewEndpointSlice(l log.Logger, eps cache.SharedIndexInformer, svc, pod, nod _, err := e.endpointSliceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - epslAddCount.Inc() + eventCount.WithLabelValues("endpointslice", "add").Inc() e.enqueue(o) }, UpdateFunc: func(_, o interface{}) { - epslUpdateCount.Inc() + eventCount.WithLabelValues("endpointslice", "update").Inc() e.enqueue(o) }, DeleteFunc: func(o interface{}) { - epslDeleteCount.Inc() + eventCount.WithLabelValues("endpointslice", "delete").Inc() e.enqueue(o) }, }) @@ -115,15 +110,15 @@ func NewEndpointSlice(l log.Logger, eps cache.SharedIndexInformer, svc, pod, nod } _, err = e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - svcAddCount.Inc() + eventCount.WithLabelValues("service", "add").Inc() serviceUpdate(o) }, UpdateFunc: func(_, o interface{}) { - svcUpdateCount.Inc() + eventCount.WithLabelValues("service", "update").Inc() serviceUpdate(o) }, DeleteFunc: func(o interface{}) { - svcDeleteCount.Inc() + eventCount.WithLabelValues("service", "delete").Inc() serviceUpdate(o) }, }) diff --git a/discovery/kubernetes/ingress.go b/discovery/kubernetes/ingress.go index fee4cc720..91b1fb0ee 100644 --- a/discovery/kubernetes/ingress.go +++ b/discovery/kubernetes/ingress.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" v1 "k8s.io/api/networking/v1" "k8s.io/api/networking/v1beta1" @@ -30,12 +31,6 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" ) -var ( - ingressAddCount = eventCount.WithLabelValues("ingress", "add") - ingressUpdateCount = eventCount.WithLabelValues("ingress", "update") - ingressDeleteCount = eventCount.WithLabelValues("ingress", "delete") -) - // Ingress implements discovery of Kubernetes ingress. type Ingress struct { logger log.Logger @@ -45,19 +40,19 @@ type Ingress struct { } // NewIngress returns a new ingress discovery. -func NewIngress(l log.Logger, inf cache.SharedInformer) *Ingress { +func NewIngress(l log.Logger, inf cache.SharedInformer, eventCount *prometheus.CounterVec) *Ingress { s := &Ingress{logger: l, informer: inf, store: inf.GetStore(), queue: workqueue.NewNamed("ingress")} _, err := s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - ingressAddCount.Inc() + eventCount.WithLabelValues("ingress", "add").Inc() s.enqueue(o) }, DeleteFunc: func(o interface{}) { - ingressDeleteCount.Inc() + eventCount.WithLabelValues("ingress", "delete").Inc() s.enqueue(o) }, UpdateFunc: func(_, o interface{}) { - ingressUpdateCount.Inc() + eventCount.WithLabelValues("ingress", "update").Inc() s.enqueue(o) }, }) diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 7bd96652f..4deaf3f68 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -58,24 +58,14 @@ import ( const ( // metaLabelPrefix is the meta prefix used for all meta labels. // in this discovery. - metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_" - namespaceLabel = metaLabelPrefix + "namespace" - metricsNamespace = "prometheus_sd_kubernetes" - presentValue = model.LabelValue("true") + metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_" + namespaceLabel = metaLabelPrefix + "namespace" + presentValue = model.LabelValue("true") ) var ( // Http header. userAgent = fmt.Sprintf("Prometheus/%s", version.Version) - // Custom events metric. - eventCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: metricsNamespace, - Name: "events_total", - Help: "The number of Kubernetes events handled.", - }, - []string{"role", "event"}, - ) // DefaultSDConfig is the default Kubernetes SD configuration. DefaultSDConfig = SDConfig{ HTTPClientConfig: config.DefaultHTTPClientConfig, @@ -84,15 +74,6 @@ var ( func init() { discovery.RegisterConfig(&SDConfig{}) - prometheus.MustRegister(eventCount) - // Initialize metric vectors. - for _, role := range []string{"endpointslice", "endpoints", "node", "pod", "service", "ingress"} { - for _, evt := range []string{"add", "delete", "update"} { - eventCount.WithLabelValues(role, evt) - } - } - (&clientGoRequestMetricAdapter{}).Register(prometheus.DefaultRegisterer) - (&clientGoWorkqueueMetricsProvider{}).Register(prometheus.DefaultRegisterer) } // Role is role of the service in Kubernetes. @@ -137,7 +118,7 @@ func (*SDConfig) Name() string { return "kubernetes" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return New(opts.Logger, c) + return New(opts.Logger, opts.Registerer, c) } // SetDirectory joins any relative file paths with dir. @@ -274,6 +255,8 @@ type Discovery struct { selectors roleSelector ownNamespace string attachMetadata AttachMetadataConfig + eventCount *prometheus.CounterVec + metricRegisterer discovery.MetricRegisterer } func (d *Discovery) getNamespaces() []string { @@ -292,7 +275,7 @@ func (d *Discovery) getNamespaces() []string { } // New creates a new Kubernetes discovery for the given role. -func New(l log.Logger, conf *SDConfig) (*Discovery, error) { +func New(l log.Logger, reg prometheus.Registerer, conf *SDConfig) (*Discovery, error) { if l == nil { l = log.NewNopLogger() } @@ -346,7 +329,7 @@ func New(l log.Logger, conf *SDConfig) (*Discovery, error) { return nil, err } - return &Discovery{ + d := &Discovery{ client: c, logger: l, role: conf.Role, @@ -355,7 +338,26 @@ func New(l log.Logger, conf *SDConfig) (*Discovery, error) { selectors: mapSelector(conf.Selectors), ownNamespace: ownNamespace, attachMetadata: conf.AttachMetadata, - }, nil + eventCount: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: discovery.KubernetesMetricsNamespace, + Name: "events_total", + Help: "The number of Kubernetes events handled.", + }, + []string{"role", "event"}, + ), + } + + d.metricRegisterer = discovery.NewMetricRegisterer(reg, []prometheus.Collector{d.eventCount}) + + // Initialize metric vectors. + for _, role := range []string{"endpointslice", "endpoints", "node", "pod", "service", "ingress"} { + for _, evt := range []string{"add", "delete", "update"} { + d.eventCount.WithLabelValues(role, evt) + } + } + + return d, nil } func mapSelector(rawSelector []SelectorConfig) roleSelector { @@ -391,6 +393,14 @@ const resyncDisabled = 0 // Run implements the discoverer interface. func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { d.Lock() + + err := d.metricRegisterer.RegisterMetrics() + if err != nil { + level.Error(d.logger).Log("msg", "Unable to register metrics", "err", err.Error()) + return + } + defer d.metricRegisterer.UnregisterMetrics() + namespaces := d.getNamespaces() switch d.role { @@ -482,6 +492,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled), nodeInf, + d.eventCount, ) d.discoverers = append(d.discoverers, eps) go eps.endpointSliceInf.Run(ctx.Done()) @@ -541,6 +552,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled), nodeInf, + d.eventCount, ) d.discoverers = append(d.discoverers, eps) go eps.endpointsInf.Run(ctx.Done()) @@ -572,6 +584,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { log.With(d.logger, "role", "pod"), d.newPodsByNodeInformer(plw), nodeInformer, + d.eventCount, ) d.discoverers = append(d.discoverers, pod) go pod.podInf.Run(ctx.Done()) @@ -594,6 +607,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { svc := NewService( log.With(d.logger, "role", "service"), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled), + d.eventCount, ) d.discoverers = append(d.discoverers, svc) go svc.informer.Run(ctx.Done()) @@ -651,13 +665,14 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { ingress := NewIngress( log.With(d.logger, "role", "ingress"), informer, + d.eventCount, ) d.discoverers = append(d.discoverers, ingress) go ingress.informer.Run(ctx.Done()) } case RoleNode: nodeInformer := d.newNodeInformer(ctx) - node := NewNode(log.With(d.logger, "role", "node"), nodeInformer) + node := NewNode(log.With(d.logger, "role", "node"), nodeInformer, d.eventCount) d.discoverers = append(d.discoverers, node) go node.informer.Run(ctx.Done()) default: diff --git a/discovery/kubernetes/kubernetes_test.go b/discovery/kubernetes/kubernetes_test.go index d0ed4c6ca..71c937e94 100644 --- a/discovery/kubernetes/kubernetes_test.go +++ b/discovery/kubernetes/kubernetes_test.go @@ -29,6 +29,8 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/testutil" @@ -49,13 +51,25 @@ func makeDiscoveryWithVersion(role Role, nsDiscovery NamespaceDiscovery, k8sVer fakeDiscovery, _ := clientset.Discovery().(*fakediscovery.FakeDiscovery) fakeDiscovery.FakedServerVersion = &version.Info{GitVersion: k8sVer} - return &Discovery{ + d := &Discovery{ client: clientset, logger: log.NewNopLogger(), role: role, namespaceDiscovery: &nsDiscovery, ownNamespace: "own-ns", - }, clientset + eventCount: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: discovery.KubernetesMetricsNamespace, + Name: "events_total", + Help: "The number of Kubernetes events handled.", + }, + []string{"role", "event"}, + ), + } + + d.metricRegisterer = discovery.NewMetricRegisterer(prometheus.NewRegistry(), []prometheus.Collector{d.eventCount}) + + return d, clientset } // makeDiscoveryWithMetadata creates a kubernetes.Discovery instance with the specified metadata config. diff --git a/discovery/kubernetes/node.go b/discovery/kubernetes/node.go index b188a3ceb..f1e37e6fa 100644 --- a/discovery/kubernetes/node.go +++ b/discovery/kubernetes/node.go @@ -22,6 +22,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" apiv1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" @@ -35,12 +36,6 @@ const ( NodeLegacyHostIP = "LegacyHostIP" ) -var ( - nodeAddCount = eventCount.WithLabelValues("node", "add") - nodeUpdateCount = eventCount.WithLabelValues("node", "update") - nodeDeleteCount = eventCount.WithLabelValues("node", "delete") -) - // Node discovers Kubernetes nodes. type Node struct { logger log.Logger @@ -50,22 +45,22 @@ type Node struct { } // NewNode returns a new node discovery. -func NewNode(l log.Logger, inf cache.SharedInformer) *Node { +func NewNode(l log.Logger, inf cache.SharedInformer, eventCount *prometheus.CounterVec) *Node { if l == nil { l = log.NewNopLogger() } n := &Node{logger: l, informer: inf, store: inf.GetStore(), queue: workqueue.NewNamed("node")} _, err := n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - nodeAddCount.Inc() + eventCount.WithLabelValues("node", "add").Inc() n.enqueue(o) }, DeleteFunc: func(o interface{}) { - nodeDeleteCount.Inc() + eventCount.WithLabelValues("node", "delete").Inc() n.enqueue(o) }, UpdateFunc: func(_, o interface{}) { - nodeUpdateCount.Inc() + eventCount.WithLabelValues("node", "update").Inc() n.enqueue(o) }, }) diff --git a/discovery/kubernetes/pod.go b/discovery/kubernetes/pod.go index 88da7bba6..cc809b29c 100644 --- a/discovery/kubernetes/pod.go +++ b/discovery/kubernetes/pod.go @@ -23,6 +23,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,12 +35,6 @@ import ( const nodeIndex = "node" -var ( - podAddCount = eventCount.WithLabelValues("pod", "add") - podUpdateCount = eventCount.WithLabelValues("pod", "update") - podDeleteCount = eventCount.WithLabelValues("pod", "delete") -) - // Pod discovers new pod targets. type Pod struct { podInf cache.SharedIndexInformer @@ -51,7 +46,7 @@ type Pod struct { } // NewPod creates a new pod discovery. -func NewPod(l log.Logger, pods cache.SharedIndexInformer, nodes cache.SharedInformer) *Pod { +func NewPod(l log.Logger, pods cache.SharedIndexInformer, nodes cache.SharedInformer, eventCount *prometheus.CounterVec) *Pod { if l == nil { l = log.NewNopLogger() } @@ -66,15 +61,15 @@ func NewPod(l log.Logger, pods cache.SharedIndexInformer, nodes cache.SharedInfo } _, err := p.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - podAddCount.Inc() + eventCount.WithLabelValues("pod", "add").Inc() p.enqueue(o) }, DeleteFunc: func(o interface{}) { - podDeleteCount.Inc() + eventCount.WithLabelValues("pod", "delete").Inc() p.enqueue(o) }, UpdateFunc: func(_, o interface{}) { - podUpdateCount.Inc() + eventCount.WithLabelValues("pod", "update").Inc() p.enqueue(o) }, }) diff --git a/discovery/kubernetes/service.go b/discovery/kubernetes/service.go index 9fcc6644c..a680ebee8 100644 --- a/discovery/kubernetes/service.go +++ b/discovery/kubernetes/service.go @@ -22,6 +22,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" apiv1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" @@ -30,12 +31,6 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" ) -var ( - svcAddCount = eventCount.WithLabelValues("service", "add") - svcUpdateCount = eventCount.WithLabelValues("service", "update") - svcDeleteCount = eventCount.WithLabelValues("service", "delete") -) - // Service implements discovery of Kubernetes services. type Service struct { logger log.Logger @@ -45,22 +40,22 @@ type Service struct { } // NewService returns a new service discovery. -func NewService(l log.Logger, inf cache.SharedInformer) *Service { +func NewService(l log.Logger, inf cache.SharedInformer, eventCount *prometheus.CounterVec) *Service { if l == nil { l = log.NewNopLogger() } s := &Service{logger: l, informer: inf, store: inf.GetStore(), queue: workqueue.NewNamed("service")} _, err := s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - svcAddCount.Inc() + eventCount.WithLabelValues("service", "add").Inc() s.enqueue(o) }, DeleteFunc: func(o interface{}) { - svcDeleteCount.Inc() + eventCount.WithLabelValues("service", "delete").Inc() s.enqueue(o) }, UpdateFunc: func(_, o interface{}) { - svcUpdateCount.Inc() + eventCount.WithLabelValues("service", "update").Inc() s.enqueue(o) }, }) diff --git a/discovery/legacymanager/manager.go b/discovery/legacymanager/manager.go index 74c544e72..101012daf 100644 --- a/discovery/legacymanager/manager.go +++ b/discovery/legacymanager/manager.go @@ -28,48 +28,6 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" ) -var ( - failedConfigs = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "prometheus_sd_failed_configs", - Help: "Current number of service discovery configurations that failed to load.", - }, - []string{"name"}, - ) - discoveredTargets = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "prometheus_sd_discovered_targets", - Help: "Current number of discovered targets.", - }, - []string{"name", "config"}, - ) - receivedUpdates = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "prometheus_sd_received_updates_total", - Help: "Total number of update events received from the SD providers.", - }, - []string{"name"}, - ) - delayedUpdates = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "prometheus_sd_updates_delayed_total", - Help: "Total number of update events that couldn't be sent immediately.", - }, - []string{"name"}, - ) - sentUpdates = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "prometheus_sd_updates_total", - Help: "Total number of update events sent to the SD consumers.", - }, - []string{"name"}, - ) -) - -func RegisterMetrics() { - prometheus.MustRegister(failedConfigs, discoveredTargets, receivedUpdates, delayedUpdates, sentUpdates) -} - type poolKey struct { setName string provider string @@ -84,7 +42,7 @@ type provider struct { } // NewManager is the Discovery Manager constructor. -func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager)) *Manager { +func NewManager(ctx context.Context, logger log.Logger, registerer prometheus.Registerer, metrics *discovery.Metrics, options ...func(*Manager)) *Manager { if logger == nil { logger = log.NewNopLogger() } @@ -96,6 +54,8 @@ func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager ctx: ctx, updatert: 5 * time.Second, triggerSend: make(chan struct{}, 1), + registerer: registerer, + metrics: metrics, } for _, option := range options { option(mgr) @@ -135,6 +95,11 @@ type Manager struct { // The triggerSend channel signals to the manager that new updates have been received from providers. triggerSend chan struct{} + + // A registerer for all service discovery metrics. + registerer prometheus.Registerer + + metrics *discovery.Metrics } // Run starts the background processing. @@ -157,7 +122,7 @@ func (m *Manager) ApplyConfig(cfg map[string]discovery.Configs) error { for pk := range m.targets { if _, ok := cfg[pk.setName]; !ok { - discoveredTargets.DeleteLabelValues(m.name, pk.setName) + m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, pk.setName) } } m.cancelDiscoverers() @@ -168,9 +133,9 @@ func (m *Manager) ApplyConfig(cfg map[string]discovery.Configs) error { failedCount := 0 for name, scfg := range cfg { failedCount += m.registerProviders(scfg, name) - discoveredTargets.WithLabelValues(m.name, name).Set(0) + m.metrics.DiscoveredTargets.WithLabelValues(m.name, name).Set(0) } - failedConfigs.WithLabelValues(m.name).Set(float64(failedCount)) + m.metrics.FailedConfigs.WithLabelValues(m.name).Set(float64(failedCount)) for _, prov := range m.providers { m.startProvider(m.ctx, prov) @@ -207,7 +172,7 @@ func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targ case <-ctx.Done(): return case tgs, ok := <-updates: - receivedUpdates.WithLabelValues(m.name).Inc() + m.metrics.ReceivedUpdates.WithLabelValues(m.name).Inc() if !ok { level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name) return @@ -236,11 +201,11 @@ func (m *Manager) sender() { case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. select { case <-m.triggerSend: - sentUpdates.WithLabelValues(m.name).Inc() + m.metrics.SentUpdates.WithLabelValues(m.name).Inc() select { case m.syncCh <- m.allGroups(): default: - delayedUpdates.WithLabelValues(m.name).Inc() + m.metrics.DelayedUpdates.WithLabelValues(m.name).Inc() level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") select { case m.triggerSend <- struct{}{}: @@ -288,7 +253,7 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { } } for setName, v := range n { - discoveredTargets.WithLabelValues(m.name, setName).Set(float64(v)) + m.metrics.DiscoveredTargets.WithLabelValues(m.name, setName).Set(float64(v)) } return tSets } @@ -309,7 +274,8 @@ func (m *Manager) registerProviders(cfgs discovery.Configs, setName string) int } typ := cfg.Name() d, err := cfg.NewDiscoverer(discovery.DiscovererOptions{ - Logger: log.With(m.logger, "discovery", typ, "config", setName), + Logger: log.With(m.logger, "discovery", typ, "config", setName), + Registerer: m.registerer, }) if err != nil { level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", typ, "config", setName) diff --git a/discovery/legacymanager/manager_test.go b/discovery/legacymanager/manager_test.go index 13b84e6e3..dccb687c2 100644 --- a/discovery/legacymanager/manager_test.go +++ b/discovery/legacymanager/manager_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -35,6 +36,12 @@ func TestMain(m *testing.M) { testutil.TolerantVerifyLeak(m) } +func newTestDiscoveryMetrics(t *testing.T) *discovery.Metrics { + metrics, err := discovery.NewMetrics(prometheus.NewRegistry()) + require.NoError(t, err) + return metrics +} + // TestTargetUpdatesOrder checks that the target updates are received in the expected order. func TestTargetUpdatesOrder(t *testing.T) { // The order by which the updates are send is determined by the interval passed to the mock discovery adapter @@ -664,7 +671,7 @@ func TestTargetUpdatesOrder(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond var totalUpdatesCount int @@ -746,7 +753,7 @@ func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Grou func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -774,7 +781,7 @@ func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { func TestDiscovererConfigs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -798,7 +805,7 @@ func TestDiscovererConfigs(t *testing.T) { func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -837,7 +844,7 @@ func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) { func TestIdenticalConfigurationsAreCoalesced(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) + discoveryManager := NewManager(ctx, nil, prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -868,7 +875,7 @@ func TestApplyConfigDoesNotModifyStaticTargets(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -893,7 +900,7 @@ func (e errorConfig) NewDiscoverer(discovery.DiscovererOptions) (discovery.Disco func TestGaugeFailedConfigs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -907,7 +914,7 @@ func TestGaugeFailedConfigs(t *testing.T) { discoveryManager.ApplyConfig(c) <-discoveryManager.SyncCh() - failedCount := client_testutil.ToFloat64(failedConfigs) + failedCount := client_testutil.ToFloat64(discoveryManager.metrics.FailedConfigs) if failedCount != 3 { t.Fatalf("Expected to have 3 failed configs, got: %v", failedCount) } @@ -918,7 +925,7 @@ func TestGaugeFailedConfigs(t *testing.T) { discoveryManager.ApplyConfig(c) <-discoveryManager.SyncCh() - failedCount = client_testutil.ToFloat64(failedConfigs) + failedCount = client_testutil.ToFloat64(discoveryManager.metrics.FailedConfigs) if failedCount != 0 { t.Fatalf("Expected to get no failed config, got: %v", failedCount) } @@ -1049,7 +1056,7 @@ func TestCoordinationWithReceiver(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - mgr := NewManager(ctx, nil) + mgr := NewManager(ctx, nil, prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) mgr.updatert = updateDelay go mgr.Run() diff --git a/discovery/linode/linode.go b/discovery/linode/linode.go index a5e047b94..38a5cdad4 100644 --- a/discovery/linode/linode.go +++ b/discovery/linode/linode.go @@ -67,24 +67,15 @@ const ( ) // DefaultSDConfig is the default Linode SD configuration. -var ( - DefaultSDConfig = SDConfig{ - TagSeparator: ",", - Port: 80, - RefreshInterval: model.Duration(60 * time.Second), - HTTPClientConfig: config.DefaultHTTPClientConfig, - } - - failuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_linode_failures_total", - Help: "Number of Linode service discovery refresh failures.", - }) -) +var DefaultSDConfig = SDConfig{ + TagSeparator: ",", + Port: 80, + RefreshInterval: model.Duration(60 * time.Second), + HTTPClientConfig: config.DefaultHTTPClientConfig, +} func init() { discovery.RegisterConfig(&SDConfig{}) - prometheus.MustRegister(failuresCount) } // SDConfig is the configuration for Linode based service discovery. @@ -101,7 +92,7 @@ func (*SDConfig) Name() string { return "linode" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -131,16 +122,22 @@ type Discovery struct { pollCount int lastResults []*targetgroup.Group eventPollingEnabled bool + failuresCount prometheus.Counter } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { d := &Discovery{ port: conf.Port, tagSeparator: conf.TagSeparator, pollCount: 0, lastRefreshTimestamp: time.Now().UTC(), eventPollingEnabled: true, + failuresCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_sd_linode_failures_total", + Help: "Number of Linode service discovery refresh failures.", + }), } rt, err := config.NewRoundTripperFromConfig(conf.HTTPClientConfig, "linode_sd") @@ -158,10 +155,14 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { d.client = &client d.Discovery = refresh.NewDiscovery( - logger, - "linode", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "linode", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + Metrics: []prometheus.Collector{d.failuresCount}, + }, ) return d, nil } @@ -222,14 +223,14 @@ func (d *Discovery) refreshData(ctx context.Context) ([]*targetgroup.Group, erro // Gather all linode instances. instances, err := d.client.ListInstances(ctx, &linodego.ListOptions{PageSize: 500}) if err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, err } // Gather detailed IP address info for all IPs on all linode instances. detailedIPs, err := d.client.ListIPAddresses(ctx, &linodego.ListOptions{PageSize: 500}) if err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, err } diff --git a/discovery/linode/linode_test.go b/discovery/linode/linode_test.go index db4ee9bf8..536b12090 100644 --- a/discovery/linode/linode_test.go +++ b/discovery/linode/linode_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -52,7 +53,7 @@ func TestLinodeSDRefresh(t *testing.T) { Credentials: tokenID, Type: "Bearer", } - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) endpoint, err := url.Parse(sdmock.Mock.Endpoint()) require.NoError(t, err) diff --git a/discovery/manager.go b/discovery/manager.go index 86439d2c9..6afa1b622 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -28,48 +28,6 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" ) -var ( - failedConfigs = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "prometheus_sd_failed_configs", - Help: "Current number of service discovery configurations that failed to load.", - }, - []string{"name"}, - ) - discoveredTargets = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "prometheus_sd_discovered_targets", - Help: "Current number of discovered targets.", - }, - []string{"name", "config"}, - ) - receivedUpdates = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "prometheus_sd_received_updates_total", - Help: "Total number of update events received from the SD providers.", - }, - []string{"name"}, - ) - delayedUpdates = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "prometheus_sd_updates_delayed_total", - Help: "Total number of update events that couldn't be sent immediately.", - }, - []string{"name"}, - ) - sentUpdates = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "prometheus_sd_updates_total", - Help: "Total number of update events sent to the SD consumers.", - }, - []string{"name"}, - ) -) - -func RegisterMetrics() { - prometheus.MustRegister(failedConfigs, discoveredTargets, receivedUpdates, delayedUpdates, sentUpdates) -} - type poolKey struct { setName string provider string @@ -107,7 +65,7 @@ func (p *Provider) Config() interface{} { } // NewManager is the Discovery Manager constructor. -func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager)) *Manager { +func NewManager(ctx context.Context, logger log.Logger, registerer prometheus.Registerer, metrics *Metrics, options ...func(*Manager)) *Manager { if logger == nil { logger = log.NewNopLogger() } @@ -118,6 +76,8 @@ func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager ctx: ctx, updatert: 5 * time.Second, triggerSend: make(chan struct{}, 1), + registerer: registerer, + metrics: metrics, } for _, option := range options { option(mgr) @@ -170,6 +130,11 @@ type Manager struct { // lastProvider counts providers registered during Manager's lifetime. lastProvider uint + + // A registerer for all service discovery metrics. + registerer prometheus.Registerer + + metrics *Metrics } // Providers returns the currently configured SD providers. @@ -200,7 +165,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { for name, scfg := range cfg { failedCount += m.registerProviders(scfg, name) } - failedConfigs.WithLabelValues(m.name).Set(float64(failedCount)) + m.metrics.FailedConfigs.WithLabelValues(m.name).Set(float64(failedCount)) var ( wg sync.WaitGroup @@ -230,13 +195,13 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { // Remove obsolete subs' targets. if _, ok := prov.newSubs[s]; !ok { delete(m.targets, poolKey{s, prov.name}) - discoveredTargets.DeleteLabelValues(m.name, s) + m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, s) } } // Set metrics and targets for new subs. for s := range prov.newSubs { if _, ok := prov.subs[s]; !ok { - discoveredTargets.WithLabelValues(m.name, s).Set(0) + m.metrics.DiscoveredTargets.WithLabelValues(m.name, s).Set(0) } if l := len(refTargets); l > 0 { m.targets[poolKey{s, prov.name}] = make(map[string]*targetgroup.Group, l) @@ -316,7 +281,7 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ case <-ctx.Done(): return case tgs, ok := <-updates: - receivedUpdates.WithLabelValues(m.name).Inc() + m.metrics.ReceivedUpdates.WithLabelValues(m.name).Inc() if !ok { level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name) // Wait for provider cancellation to ensure targets are cleaned up when expected. @@ -349,11 +314,11 @@ func (m *Manager) sender() { case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker. select { case <-m.triggerSend: - sentUpdates.WithLabelValues(m.name).Inc() + m.metrics.SentUpdates.WithLabelValues(m.name).Inc() select { case m.syncCh <- m.allGroups(): default: - delayedUpdates.WithLabelValues(m.name).Inc() + m.metrics.DelayedUpdates.WithLabelValues(m.name).Inc() level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") select { case m.triggerSend <- struct{}{}: @@ -405,7 +370,7 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { } } for setName, v := range n { - discoveredTargets.WithLabelValues(m.name, setName).Set(float64(v)) + m.metrics.DiscoveredTargets.WithLabelValues(m.name, setName).Set(float64(v)) } return tSets } @@ -428,6 +393,7 @@ func (m *Manager) registerProviders(cfgs Configs, setName string) int { d, err := cfg.NewDiscoverer(DiscovererOptions{ Logger: log.With(m.logger, "discovery", typ, "config", setName), HTTPClientOptions: m.httpOpts, + Registerer: m.registerer, }) if err != nil { level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", typ, "config", setName) diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 796b01458..3cb277343 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -35,6 +36,12 @@ func TestMain(m *testing.M) { testutil.TolerantVerifyLeak(m) } +func newTestDiscoveryMetrics(t *testing.T) *Metrics { + metrics, err := NewMetrics(prometheus.NewRegistry()) + require.NoError(t, err) + return metrics +} + // TestTargetUpdatesOrder checks that the target updates are received in the expected order. func TestTargetUpdatesOrder(t *testing.T) { // The order by which the updates are send is determined by the interval passed to the mock discovery adapter @@ -664,7 +671,7 @@ func TestTargetUpdatesOrder(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond var totalUpdatesCount int @@ -778,7 +785,7 @@ func pk(provider, setName string, n int) poolKey { func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -810,7 +817,7 @@ func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) { func TestTargetSetTargetGroupsPresentOnConfigRename(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -845,7 +852,7 @@ func TestTargetSetTargetGroupsPresentOnConfigRename(t *testing.T) { func TestTargetSetTargetGroupsPresentOnConfigDuplicateAndDeleteOriginal(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -883,7 +890,7 @@ func TestTargetSetTargetGroupsPresentOnConfigDuplicateAndDeleteOriginal(t *testi func TestTargetSetTargetGroupsPresentOnConfigChange(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -944,7 +951,7 @@ func TestTargetSetTargetGroupsPresentOnConfigChange(t *testing.T) { func TestTargetSetRecreatesTargetGroupsOnConfigChange(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -983,7 +990,7 @@ func TestTargetSetRecreatesTargetGroupsOnConfigChange(t *testing.T) { func TestDiscovererConfigs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -1015,7 +1022,7 @@ func TestDiscovererConfigs(t *testing.T) { func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -1062,7 +1069,7 @@ func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) { func TestIdenticalConfigurationsAreCoalesced(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) + discoveryManager := NewManager(ctx, nil, prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -1098,7 +1105,7 @@ func TestApplyConfigDoesNotModifyStaticTargets(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -1144,7 +1151,7 @@ func (s lockStaticDiscoverer) Run(ctx context.Context, up chan<- []*targetgroup. func TestGaugeFailedConfigs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() @@ -1158,7 +1165,7 @@ func TestGaugeFailedConfigs(t *testing.T) { discoveryManager.ApplyConfig(c) <-discoveryManager.SyncCh() - failedCount := client_testutil.ToFloat64(failedConfigs) + failedCount := client_testutil.ToFloat64(discoveryManager.metrics.FailedConfigs) if failedCount != 3 { t.Fatalf("Expected to have 3 failed configs, got: %v", failedCount) } @@ -1169,7 +1176,7 @@ func TestGaugeFailedConfigs(t *testing.T) { discoveryManager.ApplyConfig(c) <-discoveryManager.SyncCh() - failedCount = client_testutil.ToFloat64(failedConfigs) + failedCount = client_testutil.ToFloat64(discoveryManager.metrics.FailedConfigs) if failedCount != 0 { t.Fatalf("Expected to get no failed config, got: %v", failedCount) } @@ -1300,7 +1307,7 @@ func TestCoordinationWithReceiver(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - mgr := NewManager(ctx, nil) + mgr := NewManager(ctx, nil, prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) mgr.updatert = updateDelay go mgr.Run() @@ -1392,10 +1399,10 @@ func (o onceProvider) Run(_ context.Context, ch chan<- []*targetgroup.Group) { // TestTargetSetTargetGroupsUpdateDuringApplyConfig is used to detect races when // ApplyConfig happens at the same time as targets update. -func TestTargetSetTargetGroupsUpdateDuringApplyConfig(*testing.T) { +func TestTargetSetTargetGroupsUpdateDuringApplyConfig(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager := NewManager(ctx, log.NewNopLogger(), prometheus.NewRegistry(), newTestDiscoveryMetrics(t)) discoveryManager.updatert = 100 * time.Millisecond go discoveryManager.Run() diff --git a/discovery/marathon/marathon.go b/discovery/marathon/marathon.go index 27947fa8a..a6a6252fd 100644 --- a/discovery/marathon/marathon.go +++ b/discovery/marathon/marathon.go @@ -28,6 +28,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -83,7 +84,7 @@ func (*SDConfig) Name() string { return "marathon" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(*c, opts.Logger) + return NewDiscovery(*c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -132,7 +133,7 @@ type Discovery struct { } // NewDiscovery returns a new Marathon Discovery. -func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { rt, err := config.NewRoundTripperFromConfig(conf.HTTPClientConfig, "marathon_sd") if err != nil { return nil, err @@ -154,10 +155,13 @@ func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) { appsClient: fetchApps, } d.Discovery = refresh.NewDiscovery( - logger, - "marathon", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "marathon", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d, nil } diff --git a/discovery/marathon/marathon_test.go b/discovery/marathon/marathon_test.go index 258e3c8dd..a1ddce930 100644 --- a/discovery/marathon/marathon_test.go +++ b/discovery/marathon/marathon_test.go @@ -21,6 +21,7 @@ import ( "net/http/httptest" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -36,7 +37,7 @@ func testConfig() SDConfig { } func testUpdateServices(client appListClient) ([]*targetgroup.Group, error) { - md, err := NewDiscovery(testConfig(), nil) + md, err := NewDiscovery(testConfig(), nil, prometheus.NewRegistry()) if err != nil { return nil, err } @@ -129,7 +130,7 @@ func TestMarathonSDSendGroup(t *testing.T) { } func TestMarathonSDRemoveApp(t *testing.T) { - md, err := NewDiscovery(testConfig(), nil) + md, err := NewDiscovery(testConfig(), nil, prometheus.NewRegistry()) if err != nil { t.Fatalf("%s", err) } diff --git a/discovery/metrics.go b/discovery/metrics.go new file mode 100644 index 000000000..04f54b542 --- /dev/null +++ b/discovery/metrics.go @@ -0,0 +1,103 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + clientGoRequestMetrics = &clientGoRequestMetricAdapter{} + clientGoWorkloadMetrics = &clientGoWorkqueueMetricsProvider{} +) + +func init() { + clientGoRequestMetrics.RegisterWithK8sGoClient() + clientGoWorkloadMetrics.RegisterWithK8sGoClient() +} + +// Metrics to be used with a discovery manager. +type Metrics struct { + FailedConfigs *prometheus.GaugeVec + DiscoveredTargets *prometheus.GaugeVec + ReceivedUpdates *prometheus.CounterVec + DelayedUpdates *prometheus.CounterVec + SentUpdates *prometheus.CounterVec +} + +func NewMetrics(registerer prometheus.Registerer) (*Metrics, error) { + m := &Metrics{} + + m.FailedConfigs = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "prometheus_sd_failed_configs", + Help: "Current number of service discovery configurations that failed to load.", + }, + []string{"name"}, + ) + + m.DiscoveredTargets = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "prometheus_sd_discovered_targets", + Help: "Current number of discovered targets.", + }, + []string{"name", "config"}, + ) + + m.ReceivedUpdates = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "prometheus_sd_received_updates_total", + Help: "Total number of update events received from the SD providers.", + }, + []string{"name"}, + ) + + m.DelayedUpdates = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "prometheus_sd_updates_delayed_total", + Help: "Total number of update events that couldn't be sent immediately.", + }, + []string{"name"}, + ) + + m.SentUpdates = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "prometheus_sd_updates_total", + Help: "Total number of update events sent to the SD consumers.", + }, + []string{"name"}, + ) + + metrics := append( + []prometheus.Collector{ + m.FailedConfigs, + m.DiscoveredTargets, + m.ReceivedUpdates, + m.DelayedUpdates, + m.SentUpdates, + }, + clientGoMetrics()..., + ) + + for _, collector := range metrics { + err := registerer.Register(collector) + if err != nil { + return nil, fmt.Errorf("failed to register discovery manager metrics: %w", err) + } + } + + return m, nil +} diff --git a/discovery/kubernetes/client_metrics.go b/discovery/metrics_k8s_client.go similarity index 85% rename from discovery/kubernetes/client_metrics.go rename to discovery/metrics_k8s_client.go index 7b097b14a..4f161bc3e 100644 --- a/discovery/kubernetes/client_metrics.go +++ b/discovery/metrics_k8s_client.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes +package discovery import ( "context" @@ -23,13 +23,22 @@ import ( "k8s.io/client-go/util/workqueue" ) -const workqueueMetricsNamespace = metricsNamespace + "_workqueue" +// This file registers metrics used by the Kubernetes Go client (k8s.io/client-go). +// Unfortunately, k8s.io/client-go metrics are global. +// If we instantiate multiple k8s SD instances, their k8s/client-go metrics will overlap. +// To prevent us from displaying misleading metrics, we register k8s.io/client-go metrics +// outside of the Kubernetes SD. + +const ( + KubernetesMetricsNamespace = "prometheus_sd_kubernetes" + workqueueMetricsNamespace = KubernetesMetricsNamespace + "_workqueue" +) var ( // Metrics for client-go's HTTP requests. clientGoRequestResultMetricVec = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: metricsNamespace, + Namespace: KubernetesMetricsNamespace, Name: "http_request_total", Help: "Total number of HTTP requests to the Kubernetes API by status code.", }, @@ -37,7 +46,7 @@ var ( ) clientGoRequestLatencyMetricVec = prometheus.NewSummaryVec( prometheus.SummaryOpts{ - Namespace: metricsNamespace, + Namespace: KubernetesMetricsNamespace, Name: "http_request_duration_seconds", Help: "Summary of latencies for HTTP requests to the Kubernetes API by endpoint.", Objectives: map[float64]float64{}, @@ -109,17 +118,28 @@ func (noopMetric) Set(float64) {} // Definition of client-go metrics adapters for HTTP requests observation. type clientGoRequestMetricAdapter struct{} -func (f *clientGoRequestMetricAdapter) Register(registerer prometheus.Registerer) { +// Returns all of the Prometheus metrics derived from k8s.io/client-go. +// This may be used tu register and unregister the metrics. +func clientGoMetrics() []prometheus.Collector { + return []prometheus.Collector{ + clientGoRequestResultMetricVec, + clientGoRequestLatencyMetricVec, + clientGoWorkqueueDepthMetricVec, + clientGoWorkqueueAddsMetricVec, + clientGoWorkqueueLatencyMetricVec, + clientGoWorkqueueUnfinishedWorkSecondsMetricVec, + clientGoWorkqueueLongestRunningProcessorMetricVec, + clientGoWorkqueueWorkDurationMetricVec, + } +} + +func (f *clientGoRequestMetricAdapter) RegisterWithK8sGoClient() { metrics.Register( metrics.RegisterOpts{ RequestLatency: f, RequestResult: f, }, ) - registerer.MustRegister( - clientGoRequestResultMetricVec, - clientGoRequestLatencyMetricVec, - ) } func (clientGoRequestMetricAdapter) Increment(_ context.Context, code, _, _ string) { @@ -133,16 +153,8 @@ func (clientGoRequestMetricAdapter) Observe(_ context.Context, _ string, u url.U // Definition of client-go workqueue metrics provider definition. type clientGoWorkqueueMetricsProvider struct{} -func (f *clientGoWorkqueueMetricsProvider) Register(registerer prometheus.Registerer) { +func (f *clientGoWorkqueueMetricsProvider) RegisterWithK8sGoClient() { workqueue.SetProvider(f) - registerer.MustRegister( - clientGoWorkqueueDepthMetricVec, - clientGoWorkqueueAddsMetricVec, - clientGoWorkqueueLatencyMetricVec, - clientGoWorkqueueWorkDurationMetricVec, - clientGoWorkqueueUnfinishedWorkSecondsMetricVec, - clientGoWorkqueueLongestRunningProcessorMetricVec, - ) } func (f *clientGoWorkqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { diff --git a/discovery/moby/docker.go b/discovery/moby/docker.go index 162833ece..a13bb8704 100644 --- a/discovery/moby/docker.go +++ b/discovery/moby/docker.go @@ -26,6 +26,7 @@ import ( "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -80,7 +81,7 @@ func (*DockerSDConfig) Name() string { return "docker" } // NewDiscoverer returns a Discoverer for the Config. func (c *DockerSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDockerDiscovery(c, opts.Logger) + return NewDockerDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -114,7 +115,7 @@ type DockerDiscovery struct { } // NewDockerDiscovery returns a new DockerDiscovery which periodically refreshes its targets. -func NewDockerDiscovery(conf *DockerSDConfig, logger log.Logger) (*DockerDiscovery, error) { +func NewDockerDiscovery(conf *DockerSDConfig, logger log.Logger, reg prometheus.Registerer) (*DockerDiscovery, error) { var err error d := &DockerDiscovery{ @@ -165,10 +166,13 @@ func NewDockerDiscovery(conf *DockerSDConfig, logger log.Logger) (*DockerDiscove } d.Discovery = refresh.NewDiscovery( - logger, - "docker", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "docker", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d, nil } diff --git a/discovery/moby/docker_test.go b/discovery/moby/docker_test.go index f80c53b61..1a87ad2a1 100644 --- a/discovery/moby/docker_test.go +++ b/discovery/moby/docker_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" @@ -37,7 +38,7 @@ host: %s var cfg DockerSDConfig require.NoError(t, yaml.Unmarshal([]byte(cfgString), &cfg)) - d, err := NewDockerDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDockerDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() diff --git a/discovery/moby/dockerswarm.go b/discovery/moby/dockerswarm.go index 371f9d5ed..bd87fea5a 100644 --- a/discovery/moby/dockerswarm.go +++ b/discovery/moby/dockerswarm.go @@ -23,6 +23,7 @@ import ( "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/version" @@ -74,7 +75,7 @@ func (*DockerSwarmSDConfig) Name() string { return "dockerswarm" } // NewDiscoverer returns a Discoverer for the Config. func (c *DockerSwarmSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -117,7 +118,7 @@ type Discovery struct { } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf *DockerSwarmSDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf *DockerSwarmSDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { var err error d := &Discovery{ @@ -168,10 +169,13 @@ func NewDiscovery(conf *DockerSwarmSDConfig, logger log.Logger) (*Discovery, err } d.Discovery = refresh.NewDiscovery( - logger, - "dockerswarm", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "dockerswarm", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d, nil } diff --git a/discovery/moby/nodes_test.go b/discovery/moby/nodes_test.go index 2bc383374..512ff7049 100644 --- a/discovery/moby/nodes_test.go +++ b/discovery/moby/nodes_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" @@ -38,7 +39,7 @@ host: %s var cfg DockerSwarmSDConfig require.NoError(t, yaml.Unmarshal([]byte(cfgString), &cfg)) - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() diff --git a/discovery/moby/services_test.go b/discovery/moby/services_test.go index 81c8d31f1..816586dd7 100644 --- a/discovery/moby/services_test.go +++ b/discovery/moby/services_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" @@ -38,7 +39,7 @@ host: %s var cfg DockerSwarmSDConfig require.NoError(t, yaml.Unmarshal([]byte(cfgString), &cfg)) - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() @@ -332,7 +333,7 @@ filters: var cfg DockerSwarmSDConfig require.NoError(t, yaml.Unmarshal([]byte(cfgString), &cfg)) - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() diff --git a/discovery/moby/tasks_test.go b/discovery/moby/tasks_test.go index eed5f2924..764fda343 100644 --- a/discovery/moby/tasks_test.go +++ b/discovery/moby/tasks_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" @@ -38,7 +39,7 @@ host: %s var cfg DockerSwarmSDConfig require.NoError(t, yaml.Unmarshal([]byte(cfgString), &cfg)) - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() diff --git a/discovery/nomad/nomad.go b/discovery/nomad/nomad.go index 7013f0737..3fdcf714e 100644 --- a/discovery/nomad/nomad.go +++ b/discovery/nomad/nomad.go @@ -49,27 +49,18 @@ const ( ) // DefaultSDConfig is the default nomad SD configuration. -var ( - DefaultSDConfig = SDConfig{ - AllowStale: true, - HTTPClientConfig: config.DefaultHTTPClientConfig, - Namespace: "default", - RefreshInterval: model.Duration(60 * time.Second), - Region: "global", - Server: "http://localhost:4646", - TagSeparator: ",", - } - - failuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "prometheus_sd_nomad_failures_total", - Help: "Number of nomad service discovery refresh failures.", - }) -) +var DefaultSDConfig = SDConfig{ + AllowStale: true, + HTTPClientConfig: config.DefaultHTTPClientConfig, + Namespace: "default", + RefreshInterval: model.Duration(60 * time.Second), + Region: "global", + Server: "http://localhost:4646", + TagSeparator: ",", +} func init() { discovery.RegisterConfig(&SDConfig{}) - prometheus.MustRegister(failuresCount) } // SDConfig is the configuration for nomad based service discovery. @@ -88,7 +79,7 @@ func (*SDConfig) Name() string { return "nomad" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -121,10 +112,11 @@ type Discovery struct { region string server string tagSeparator string + failuresCount prometheus.Counter } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { d := &Discovery{ allowStale: conf.AllowStale, namespace: conf.Namespace, @@ -132,6 +124,11 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { region: conf.Region, server: conf.Server, tagSeparator: conf.TagSeparator, + failuresCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_sd_nomad_failures_total", + Help: "Number of nomad service discovery refresh failures.", + }), } HTTPClient, err := config.NewClientFromConfig(conf.HTTPClientConfig, "nomad_sd") @@ -153,10 +150,14 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { d.client = client d.Discovery = refresh.NewDiscovery( - logger, - "nomad", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "nomad", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + Metrics: []prometheus.Collector{d.failuresCount}, + }, ) return d, nil } @@ -167,7 +168,7 @@ func (d *Discovery) refresh(context.Context) ([]*targetgroup.Group, error) { } stubs, _, err := d.client.Services().List(opts) if err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, err } @@ -179,7 +180,7 @@ func (d *Discovery) refresh(context.Context) ([]*targetgroup.Group, error) { for _, service := range stub.Services { instances, _, err := d.client.Services().Get(service.ServiceName, opts) if err != nil { - failuresCount.Inc() + d.failuresCount.Inc() return nil, fmt.Errorf("failed to fetch services: %w", err) } diff --git a/discovery/nomad/nomad_test.go b/discovery/nomad/nomad_test.go index f9490f476..ca67a877e 100644 --- a/discovery/nomad/nomad_test.go +++ b/discovery/nomad/nomad_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) @@ -127,7 +128,7 @@ func TestConfiguredService(t *testing.T) { conf := &SDConfig{ Server: "http://localhost:4646", } - _, err := NewDiscovery(conf, nil) + _, err := NewDiscovery(conf, nil, prometheus.NewRegistry()) require.NoError(t, err) } @@ -141,7 +142,7 @@ func TestNomadSDRefresh(t *testing.T) { cfg := DefaultSDConfig cfg.Server = endpoint.String() - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) tgs, err := d.refresh(context.Background()) diff --git a/discovery/openstack/openstack.go b/discovery/openstack/openstack.go index 92c83a4cf..9544a7c0f 100644 --- a/discovery/openstack/openstack.go +++ b/discovery/openstack/openstack.go @@ -24,6 +24,7 @@ import ( "github.com/gophercloud/gophercloud" "github.com/gophercloud/gophercloud/openstack" "github.com/mwitkow/go-conntrack" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -70,7 +71,7 @@ func (*SDConfig) Name() string { return "openstack" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -134,16 +135,19 @@ type refresher interface { } // NewDiscovery returns a new OpenStack Discoverer which periodically refreshes its targets. -func NewDiscovery(conf *SDConfig, l log.Logger) (*refresh.Discovery, error) { +func NewDiscovery(conf *SDConfig, l log.Logger, reg prometheus.Registerer) (*refresh.Discovery, error) { r, err := newRefresher(conf, l) if err != nil { return nil, err } return refresh.NewDiscovery( - l, - "openstack", - time.Duration(conf.RefreshInterval), - r.refresh, + refresh.Options{ + Logger: l, + Mech: "openstack", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: r.refresh, + Registry: reg, + }, ), nil } diff --git a/discovery/ovhcloud/ovhcloud.go b/discovery/ovhcloud/ovhcloud.go index 535ade4df..eca284a85 100644 --- a/discovery/ovhcloud/ovhcloud.go +++ b/discovery/ovhcloud/ovhcloud.go @@ -22,6 +22,7 @@ import ( "github.com/go-kit/log" "github.com/ovh/go-ovh/ovh" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -93,7 +94,7 @@ func createClient(config *SDConfig) (*ovh.Client, error) { // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(options discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, options.Logger) + return NewDiscovery(c, options.Logger, options.Registerer) } func init() { @@ -140,16 +141,19 @@ func newRefresher(conf *SDConfig, logger log.Logger) (refresher, error) { } // NewDiscovery returns a new OVHcloud Discoverer which periodically refreshes its targets. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*refresh.Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*refresh.Discovery, error) { r, err := newRefresher(conf, logger) if err != nil { return nil, err } return refresh.NewDiscovery( - logger, - "ovhcloud", - time.Duration(conf.RefreshInterval), - r.refresh, + refresh.Options{ + Logger: logger, + Mech: "ovhcloud", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: r.refresh, + Registry: reg, + }, ), nil } diff --git a/discovery/ovhcloud/ovhcloud_test.go b/discovery/ovhcloud/ovhcloud_test.go index efcd95bb0..9bd9ea954 100644 --- a/discovery/ovhcloud/ovhcloud_test.go +++ b/discovery/ovhcloud/ovhcloud_test.go @@ -18,6 +18,7 @@ import ( "fmt" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" @@ -122,7 +123,8 @@ func TestDiscoverer(t *testing.T) { conf, _ := getMockConf("vps") logger := testutil.NewLogger(t) _, err := conf.NewDiscoverer(discovery.DiscovererOptions{ - Logger: logger, + Logger: logger, + Registerer: prometheus.NewRegistry(), }) require.NoError(t, err) diff --git a/discovery/puppetdb/puppetdb.go b/discovery/puppetdb/puppetdb.go index 9484a0aa6..616f2c61e 100644 --- a/discovery/puppetdb/puppetdb.go +++ b/discovery/puppetdb/puppetdb.go @@ -29,6 +29,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/regexp" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/version" @@ -83,7 +84,7 @@ func (*SDConfig) Name() string { return "puppetdb" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -130,7 +131,7 @@ type Discovery struct { } // NewDiscovery returns a new PuppetDB discovery for the given config. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { if logger == nil { logger = log.NewNopLogger() } @@ -156,10 +157,13 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { } d.Discovery = refresh.NewDiscovery( - logger, - "http", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "http", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d, nil } diff --git a/discovery/puppetdb/puppetdb_test.go b/discovery/puppetdb/puppetdb_test.go index 236efec16..edd9b9d04 100644 --- a/discovery/puppetdb/puppetdb_test.go +++ b/discovery/puppetdb/puppetdb_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -62,7 +63,7 @@ func TestPuppetSlashInURL(t *testing.T) { Port: 80, RefreshInterval: model.Duration(30 * time.Second), } - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) require.Equal(t, apiURL, d.url) } @@ -79,7 +80,7 @@ func TestPuppetDBRefresh(t *testing.T) { RefreshInterval: model.Duration(30 * time.Second), } - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() @@ -120,7 +121,7 @@ func TestPuppetDBRefreshWithParameters(t *testing.T) { RefreshInterval: model.Duration(30 * time.Second), } - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() @@ -172,7 +173,7 @@ func TestPuppetDBInvalidCode(t *testing.T) { RefreshInterval: model.Duration(30 * time.Second), } - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() @@ -193,7 +194,7 @@ func TestPuppetDBInvalidFormat(t *testing.T) { RefreshInterval: model.Duration(30 * time.Second), } - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) ctx := context.Background() diff --git a/discovery/refresh/refresh.go b/discovery/refresh/refresh.go index 919567a53..0b0e5a921 100644 --- a/discovery/refresh/refresh.go +++ b/discovery/refresh/refresh.go @@ -22,29 +22,17 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/targetgroup" ) -var ( - failuresCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "prometheus_sd_refresh_failures_total", - Help: "Number of refresh failures for the given SD mechanism.", - }, - []string{"mechanism"}, - ) - duration = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Name: "prometheus_sd_refresh_duration_seconds", - Help: "The duration of a refresh in seconds for the given SD mechanism.", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, - []string{"mechanism"}, - ) -) - -func init() { - prometheus.MustRegister(duration, failuresCount) +type Options struct { + Logger log.Logger + Mech string + Interval time.Duration + RefreshF func(ctx context.Context) ([]*targetgroup.Group, error) + Registry prometheus.Registerer + Metrics []prometheus.Collector } // Discovery implements the Discoverer interface. @@ -54,25 +42,62 @@ type Discovery struct { refreshf func(ctx context.Context) ([]*targetgroup.Group, error) failures prometheus.Counter - duration prometheus.Observer + duration prometheus.Summary + + metricRegisterer discovery.MetricRegisterer } // NewDiscovery returns a Discoverer function that calls a refresh() function at every interval. -func NewDiscovery(l log.Logger, mech string, interval time.Duration, refreshf func(ctx context.Context) ([]*targetgroup.Group, error)) *Discovery { - if l == nil { - l = log.NewNopLogger() +func NewDiscovery(opts Options) *Discovery { + var logger log.Logger + if opts.Logger == nil { + logger = log.NewNopLogger() + } else { + logger = opts.Logger } - return &Discovery{ - logger: l, - interval: interval, - refreshf: refreshf, - failures: failuresCount.WithLabelValues(mech), - duration: duration.WithLabelValues(mech), + + d := Discovery{ + logger: logger, + interval: opts.Interval, + refreshf: opts.RefreshF, + failures: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_sd_refresh_failures_total", + Help: "Number of refresh failures for the given SD mechanism.", + ConstLabels: prometheus.Labels{ + "mechanism": opts.Mech, + }, + }), + duration: prometheus.NewSummary( + prometheus.SummaryOpts{ + Name: "prometheus_sd_refresh_duration_seconds", + Help: "The duration of a refresh in seconds for the given SD mechanism.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + ConstLabels: prometheus.Labels{ + "mechanism": opts.Mech, + }, + }), } + + metrics := []prometheus.Collector{d.failures, d.duration} + if opts.Metrics != nil { + metrics = append(metrics, opts.Metrics...) + } + + d.metricRegisterer = discovery.NewMetricRegisterer(opts.Registry, metrics) + + return &d } // Run implements the Discoverer interface. func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + err := d.metricRegisterer.RegisterMetrics() + if err != nil { + level.Error(d.logger).Log("msg", "Unable to register metrics", "err", err.Error()) + return + } + defer d.metricRegisterer.UnregisterMetrics() + // Get an initial set right away. tgs, err := d.refresh(ctx) if err != nil { diff --git a/discovery/refresh/refresh_test.go b/discovery/refresh/refresh_test.go index 6decef19f..12e7ab3be 100644 --- a/discovery/refresh/refresh_test.go +++ b/discovery/refresh/refresh_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -65,7 +66,15 @@ func TestRefresh(t *testing.T) { return nil, fmt.Errorf("some error") } interval := time.Millisecond - d := NewDiscovery(nil, "test", interval, refresh) + d := NewDiscovery( + Options{ + Logger: nil, + Mech: "test", + Interval: interval, + RefreshF: refresh, + Registry: prometheus.NewRegistry(), + }, + ) ch := make(chan []*targetgroup.Group) ctx, cancel := context.WithCancel(context.Background()) diff --git a/discovery/scaleway/scaleway.go b/discovery/scaleway/scaleway.go index 90091b317..86527b34e 100644 --- a/discovery/scaleway/scaleway.go +++ b/discovery/scaleway/scaleway.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/scaleway/scaleway-sdk-go/scw" @@ -160,7 +161,7 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { } func (c SDConfig) NewDiscoverer(options discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(&c, options.Logger) + return NewDiscovery(&c, options.Logger, options.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -177,17 +178,20 @@ func init() { // the Discoverer interface. type Discovery struct{} -func NewDiscovery(conf *SDConfig, logger log.Logger) (*refresh.Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*refresh.Discovery, error) { r, err := newRefresher(conf) if err != nil { return nil, err } return refresh.NewDiscovery( - logger, - "scaleway", - time.Duration(conf.RefreshInterval), - r.refresh, + refresh.Options{ + Logger: logger, + Mech: "scaleway", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: r.refresh, + Registry: reg, + }, ), nil } diff --git a/discovery/triton/triton.go b/discovery/triton/triton.go index c83f3b34a..4839827ad 100644 --- a/discovery/triton/triton.go +++ b/discovery/triton/triton.go @@ -26,6 +26,7 @@ import ( "github.com/go-kit/log" "github.com/mwitkow/go-conntrack" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -74,7 +75,7 @@ func (*SDConfig) Name() string { return "triton" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return New(opts.Logger, c) + return New(opts.Logger, c, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -138,7 +139,7 @@ type Discovery struct { } // New returns a new Discovery which periodically refreshes its targets. -func New(logger log.Logger, conf *SDConfig) (*Discovery, error) { +func New(logger log.Logger, conf *SDConfig, reg prometheus.Registerer) (*Discovery, error) { tls, err := config.NewTLSConfig(&conf.TLSConfig) if err != nil { return nil, err @@ -159,10 +160,13 @@ func New(logger log.Logger, conf *SDConfig) (*Discovery, error) { sdConfig: conf, } d.Discovery = refresh.NewDiscovery( - logger, - "triton", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "triton", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d, nil } diff --git a/discovery/triton/triton_test.go b/discovery/triton/triton_test.go index 0ed9daa68..fa51a2e47 100644 --- a/discovery/triton/triton_test.go +++ b/discovery/triton/triton_test.go @@ -24,6 +24,7 @@ import ( "strings" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -79,7 +80,7 @@ var ( ) func newTritonDiscovery(c SDConfig) (*Discovery, error) { - return New(nil, &c) + return New(nil, &c, prometheus.NewRegistry()) } func TestTritonSDNew(t *testing.T) { diff --git a/discovery/util.go b/discovery/util.go new file mode 100644 index 000000000..83cc640dd --- /dev/null +++ b/discovery/util.go @@ -0,0 +1,72 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" +) + +// A utility to be used by implementations of discovery.Discoverer +// which need to manage the lifetime of their metrics. +type MetricRegisterer interface { + RegisterMetrics() error + UnregisterMetrics() +} + +// metricRegistererImpl is an implementation of MetricRegisterer. +type metricRegistererImpl struct { + reg prometheus.Registerer + metrics []prometheus.Collector +} + +var _ MetricRegisterer = &metricRegistererImpl{} + +// Creates an instance of a MetricRegisterer. +// Typically called inside the implementation of the NewDiscoverer() method. +func NewMetricRegisterer(reg prometheus.Registerer, metrics []prometheus.Collector) MetricRegisterer { + return &metricRegistererImpl{ + reg: reg, + metrics: metrics, + } +} + +// RegisterMetrics registers the metrics with a Prometheus registerer. +// If any metric fails to register, it will unregister all metrics that +// were registered so far, and return an error. +// Typically called at the start of the SD's Run() method. +func (rh *metricRegistererImpl) RegisterMetrics() error { + for _, collector := range rh.metrics { + err := rh.reg.Register(collector) + if err != nil { + // Unregister all metrics that were registered so far. + // This is so that if RegisterMetrics() gets called again, + // there will not be an error due to a duplicate registration. + rh.UnregisterMetrics() + + return fmt.Errorf("failed to register metric: %w", err) + } + } + return nil +} + +// UnregisterMetrics unregisters the metrics from the same Prometheus +// registerer which was used to register them. +// Typically called at the end of the SD's Run() method by a defer statement. +func (rh *metricRegistererImpl) UnregisterMetrics() { + for _, collector := range rh.metrics { + rh.reg.Unregister(collector) + } +} diff --git a/discovery/uyuni/uyuni.go b/discovery/uyuni/uyuni.go index bc33d28cb..744f3f96c 100644 --- a/discovery/uyuni/uyuni.go +++ b/discovery/uyuni/uyuni.go @@ -25,6 +25,7 @@ import ( "github.com/go-kit/log" "github.com/kolo/xmlrpc" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -115,7 +116,7 @@ func (*SDConfig) Name() string { return "uyuni" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -203,7 +204,7 @@ func getEndpointInfoForSystems( } // NewDiscovery returns a uyuni discovery for the given configuration. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { apiURL, err := url.Parse(conf.Server) if err != nil { return nil, err @@ -227,10 +228,13 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { } d.Discovery = refresh.NewDiscovery( - logger, - "uyuni", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "uyuni", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d, nil } diff --git a/discovery/uyuni/uyuni_test.go b/discovery/uyuni/uyuni_test.go index 9c910a3a3..fd03c88f1 100644 --- a/discovery/uyuni/uyuni_test.go +++ b/discovery/uyuni/uyuni_test.go @@ -23,6 +23,8 @@ import ( "github.com/stretchr/testify/require" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/discovery/targetgroup" ) @@ -35,7 +37,7 @@ func testUpdateServices(respHandler http.HandlerFunc) ([]*targetgroup.Group, err Server: ts.URL, } - md, err := NewDiscovery(&conf, nil) + md, err := NewDiscovery(&conf, nil, prometheus.NewRegistry()) if err != nil { return nil, err } @@ -108,7 +110,7 @@ func TestUyuniSDSkipLogin(t *testing.T) { Server: ts.URL, } - md, err := NewDiscovery(&conf, nil) + md, err := NewDiscovery(&conf, nil, prometheus.NewRegistry()) if err != nil { t.Error(err) } diff --git a/discovery/vultr/vultr.go b/discovery/vultr/vultr.go index 42881d3c1..129800048 100644 --- a/discovery/vultr/vultr.go +++ b/discovery/vultr/vultr.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/version" @@ -78,7 +79,7 @@ func (*SDConfig) Name() string { return "vultr" } // NewDiscoverer returns a Discoverer for the Config. func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { - return NewDiscovery(c, opts.Logger) + return NewDiscovery(c, opts.Logger, opts.Registerer) } // SetDirectory joins any relative file paths with dir. @@ -106,7 +107,7 @@ type Discovery struct { } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { +func NewDiscovery(conf *SDConfig, logger log.Logger, reg prometheus.Registerer) (*Discovery, error) { d := &Discovery{ port: conf.Port, } @@ -128,10 +129,13 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { } d.Discovery = refresh.NewDiscovery( - logger, - "vultr", - time.Duration(conf.RefreshInterval), - d.refresh, + refresh.Options{ + Logger: logger, + Mech: "vultr", + Interval: time.Duration(conf.RefreshInterval), + RefreshF: d.refresh, + Registry: reg, + }, ) return d, nil } diff --git a/discovery/vultr/vultr_test.go b/discovery/vultr/vultr_test.go index 0977238e0..c50b11d2d 100644 --- a/discovery/vultr/vultr_test.go +++ b/discovery/vultr/vultr_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) @@ -46,7 +47,7 @@ func TestVultrSDRefresh(t *testing.T) { cfg := DefaultSDConfig cfg.HTTPClientConfig.BearerToken = APIKey - d, err := NewDiscovery(&cfg, log.NewNopLogger()) + d, err := NewDiscovery(&cfg, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) endpoint, err := url.Parse(sdMock.Mock.Endpoint()) require.NoError(t, err) diff --git a/discovery/xds/kuma.go b/discovery/xds/kuma.go index bc88ba554..5ac4a42a8 100644 --- a/discovery/xds/kuma.go +++ b/discovery/xds/kuma.go @@ -30,35 +30,12 @@ import ( "github.com/prometheus/prometheus/util/strutil" ) -var ( - // DefaultKumaSDConfig is the default Kuma MADS SD configuration. - DefaultKumaSDConfig = KumaSDConfig{ - HTTPClientConfig: config.DefaultHTTPClientConfig, - RefreshInterval: model.Duration(15 * time.Second), - FetchTimeout: model.Duration(2 * time.Minute), - } - - kumaFetchFailuresCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_kuma_fetch_failures_total", - Help: "The number of Kuma MADS fetch call failures.", - }) - kumaFetchSkipUpdateCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_kuma_fetch_skipped_updates_total", - Help: "The number of Kuma MADS fetch calls that result in no updates to the targets.", - }) - kumaFetchDuration = prometheus.NewSummary( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_kuma_fetch_duration_seconds", - Help: "The duration of a Kuma MADS fetch call.", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, - ) -) +// DefaultKumaSDConfig is the default Kuma MADS SD configuration. +var DefaultKumaSDConfig = KumaSDConfig{ + HTTPClientConfig: config.DefaultHTTPClientConfig, + RefreshInterval: model.Duration(15 * time.Second), + FetchTimeout: model.Duration(2 * time.Minute), +} const ( // kumaMetaLabelPrefix is the meta prefix used for all kuma meta labels. @@ -120,7 +97,7 @@ func (c *KumaSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discover logger = log.NewNopLogger() } - return NewKumaHTTPDiscovery(c, logger) + return NewKumaHTTPDiscovery(c, logger, opts.Registerer) } func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) []model.LabelSet { @@ -176,7 +153,7 @@ func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]model.L return targets, nil } -func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Discoverer, error) { +func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger, reg prometheus.Registerer) (discovery.Discoverer, error) { // Default to "prometheus" if hostname is unavailable. clientID, err := osutil.GetFQDN() if err != nil { @@ -203,15 +180,41 @@ func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Disc } d := &fetchDiscovery{ - client: client, - logger: logger, - refreshInterval: time.Duration(conf.RefreshInterval), - source: "kuma", - parseResources: kumaMadsV1ResourceParser, - fetchFailuresCount: kumaFetchFailuresCount, - fetchSkipUpdateCount: kumaFetchSkipUpdateCount, - fetchDuration: kumaFetchDuration, + client: client, + logger: logger, + refreshInterval: time.Duration(conf.RefreshInterval), + source: "kuma", + parseResources: kumaMadsV1ResourceParser, + fetchFailuresCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "sd_kuma_fetch_failures_total", + Help: "The number of Kuma MADS fetch call failures.", + }), + fetchSkipUpdateCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "sd_kuma_fetch_skipped_updates_total", + Help: "The number of Kuma MADS fetch calls that result in no updates to the targets.", + }), + fetchDuration: prometheus.NewSummary( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "sd_kuma_fetch_duration_seconds", + Help: "The duration of a Kuma MADS fetch call.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + ), } + d.metricRegisterer = discovery.NewMetricRegisterer( + reg, + []prometheus.Collector{ + d.fetchFailuresCount, + d.fetchSkipUpdateCount, + d.fetchDuration, + }, + ) + return d, nil } diff --git a/discovery/xds/kuma_test.go b/discovery/xds/kuma_test.go index 581be9fb1..0626f82a0 100644 --- a/discovery/xds/kuma_test.go +++ b/discovery/xds/kuma_test.go @@ -21,6 +21,7 @@ import ( "time" v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -107,7 +108,7 @@ func getKumaMadsV1DiscoveryResponse(resources ...*MonitoringAssignment) (*v3.Dis } func newKumaTestHTTPDiscovery(c KumaSDConfig) (*fetchDiscovery, error) { - kd, err := NewKumaHTTPDiscovery(&c, nopLogger) + kd, err := NewKumaHTTPDiscovery(&c, nopLogger, prometheus.NewRegistry()) if err != nil { return nil, err } diff --git a/discovery/xds/xds.go b/discovery/xds/xds.go index 48bdbab02..a04fe6862 100644 --- a/discovery/xds/xds.go +++ b/discovery/xds/xds.go @@ -69,9 +69,6 @@ func init() { // Register top-level SD Configs. discovery.RegisterConfig(&KumaSDConfig{}) - // Register metrics. - prometheus.MustRegister(kumaFetchDuration, kumaFetchSkipUpdateCount, kumaFetchFailuresCount) - // Register protobuf types that need to be marshalled/ unmarshalled. mustRegisterMessage(protoTypes, (&v3.DiscoveryRequest{}).ProtoReflect().Type()) mustRegisterMessage(protoTypes, (&v3.DiscoveryResponse{}).ProtoReflect().Type()) @@ -109,12 +106,20 @@ type fetchDiscovery struct { parseResources resourceParser logger log.Logger - fetchDuration prometheus.Observer + fetchDuration prometheus.Summary fetchSkipUpdateCount prometheus.Counter fetchFailuresCount prometheus.Counter + + metricRegisterer discovery.MetricRegisterer } func (d *fetchDiscovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + err := d.metricRegisterer.RegisterMetrics() + if err != nil { + level.Error(d.logger).Log("msg", "Unable to register metrics", "err", err.Error()) + return + } + defer d.metricRegisterer.UnregisterMetrics() defer d.client.Close() ticker := time.NewTicker(d.refreshInterval) diff --git a/documentation/examples/custom-sd/adapter-usage/main.go b/documentation/examples/custom-sd/adapter-usage/main.go index ae656db19..b712749df 100644 --- a/documentation/examples/custom-sd/adapter-usage/main.go +++ b/documentation/examples/custom-sd/adapter-usage/main.go @@ -28,8 +28,10 @@ import ( "github.com/alecthomas/kingpin/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + prom_discovery "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/documentation/examples/custom-sd/adapter" "github.com/prometheus/prometheus/util/strutil" @@ -268,7 +270,13 @@ func main() { if err != nil { fmt.Println("err: ", err) } - sdAdapter := adapter.NewAdapter(ctx, *outputFile, "exampleSD", disc, logger) + + discoveryMetrics, err := prom_discovery.NewMetrics(prometheus.DefaultRegisterer) + if err != nil { + level.Error(logger).Log("msg", "failed to create discovery metrics", "err", err) + os.Exit(1) + } + sdAdapter := adapter.NewAdapter(ctx, *outputFile, "exampleSD", disc, logger, discoveryMetrics) sdAdapter.Run() <-ctx.Done() diff --git a/documentation/examples/custom-sd/adapter/adapter.go b/documentation/examples/custom-sd/adapter/adapter.go index 57c32ce49..8aedf0084 100644 --- a/documentation/examples/custom-sd/adapter/adapter.go +++ b/documentation/examples/custom-sd/adapter/adapter.go @@ -25,6 +25,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery" @@ -162,12 +163,12 @@ func (a *Adapter) Run() { } // NewAdapter creates a new instance of Adapter. -func NewAdapter(ctx context.Context, file, name string, d discovery.Discoverer, logger log.Logger) *Adapter { +func NewAdapter(ctx context.Context, file, name string, d discovery.Discoverer, logger log.Logger, metrics *discovery.Metrics) *Adapter { return &Adapter{ ctx: ctx, disc: d, groups: make(map[string]*customSD), - manager: discovery.NewManager(ctx, logger), + manager: discovery.NewManager(ctx, logger, prometheus.NewRegistry(), metrics), output: file, name: name, logger: logger, diff --git a/documentation/examples/custom-sd/adapter/adapter_test.go b/documentation/examples/custom-sd/adapter/adapter_test.go index eaf34c667..14cae47b4 100644 --- a/documentation/examples/custom-sd/adapter/adapter_test.go +++ b/documentation/examples/custom-sd/adapter/adapter_test.go @@ -18,9 +18,11 @@ import ( "os" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/targetgroup" ) @@ -226,6 +228,8 @@ func TestWriteOutput(t *testing.T) { require.NoError(t, err) defer os.Remove(tmpfile.Name()) tmpfile.Close() - adapter := NewAdapter(ctx, tmpfile.Name(), "test_sd", nil, nil) + metrics, err := discovery.NewMetrics(prometheus.NewRegistry()) + require.NoError(t, err) + adapter := NewAdapter(ctx, tmpfile.Name(), "test_sd", nil, nil, metrics) require.NoError(t, adapter.writeOutput()) }