From 7bd9508c9b2a4121ea9bb2b2c8b00117f85ab918 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 21 Nov 2016 15:51:36 +0100 Subject: [PATCH] discovery: move TargetProvider and multi-constructor --- retrieval/discovery/azure/azure.go | 16 ++- retrieval/discovery/discovery.go | 118 ++++++++++++++++-- retrieval/discovery/ec2/ec2.go | 16 ++- retrieval/discovery/file/file.go | 16 ++- retrieval/discovery/file/file_test.go | 4 +- .../discovery/fixtures/target_groups.json | 11 -- .../discovery/fixtures/target_groups.yml | 5 - retrieval/discovery/gce/gce.go | 14 +-- retrieval/discovery/zookeeper/zookeeper.go | 10 +- retrieval/targetmanager.go | 112 +---------------- retrieval/targetmanager_test.go | 5 +- 11 files changed, 146 insertions(+), 181 deletions(-) delete mode 100644 retrieval/discovery/fixtures/target_groups.json delete mode 100644 retrieval/discovery/fixtures/target_groups.yml diff --git a/retrieval/discovery/azure/azure.go b/retrieval/discovery/azure/azure.go index cec54af86..c0f354230 100644 --- a/retrieval/discovery/azure/azure.go +++ b/retrieval/discovery/azure/azure.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package azure import ( "fmt" @@ -45,15 +45,13 @@ const ( var ( azureSDRefreshFailuresCount = prometheus.NewCounter( prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_azure_refresh_failures_total", - Help: "Number of Azure-SD refresh failures.", + Name: "prometheus_sd_azure_refresh_failures_total", + Help: "Number of Azure-SD refresh failures.", }) azureSDRefreshDuration = prometheus.NewSummary( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_azure_refresh_duration_seconds", - Help: "The duration of a Azure-SD refresh in seconds.", + Name: "prometheus_sd_azure_refresh_duration_seconds", + Help: "The duration of a Azure-SD refresh in seconds.", }) ) @@ -70,8 +68,8 @@ type AzureDiscovery struct { port int } -// NewAzureDiscovery returns a new AzureDiscovery which periodically refreshes its targets. -func NewAzureDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery { +// NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets. +func NewDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery { return &AzureDiscovery{ cfg: cfg, interval: time.Duration(cfg.RefreshInterval), diff --git a/retrieval/discovery/discovery.go b/retrieval/discovery/discovery.go index aebe1e816..8872978da 100644 --- a/retrieval/discovery/discovery.go +++ b/retrieval/discovery/discovery.go @@ -14,30 +14,124 @@ package discovery import ( + "fmt" + "github.com/prometheus/common/log" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/retrieval/discovery/azure" "github.com/prometheus/prometheus/retrieval/discovery/consul" "github.com/prometheus/prometheus/retrieval/discovery/dns" + "github.com/prometheus/prometheus/retrieval/discovery/ec2" + "github.com/prometheus/prometheus/retrieval/discovery/file" + "github.com/prometheus/prometheus/retrieval/discovery/gce" "github.com/prometheus/prometheus/retrieval/discovery/kubernetes" "github.com/prometheus/prometheus/retrieval/discovery/marathon" + "github.com/prometheus/prometheus/retrieval/discovery/zookeeper" + "golang.org/x/net/context" ) -// NewConsul creates a new Consul based Discovery. -func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) { - return consul.NewDiscovery(cfg) +// A TargetProvider provides information about target groups. It maintains a set +// of sources from which TargetGroups can originate. Whenever a target provider +// detects a potential change, it sends the TargetGroup through its provided channel. +// +// The TargetProvider does not have to guarantee that an actual change happened. +// It does guarantee that it sends the new TargetGroup whenever a change happens. +// +// Providers must initially send all known target groups as soon as it can. +type TargetProvider interface { + // Run hands a channel to the target provider through which it can send + // updated target groups. The channel must be closed by the target provider + // if no more updates will be sent. + // On receiving from done Run must return. + Run(ctx context.Context, up chan<- []*config.TargetGroup) } -// NewKubernetesDiscovery creates a Kubernetes service discovery based on the passed-in configuration. -func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Kubernetes, error) { - return kubernetes.New(log.Base(), conf) +// ProvidersFromConfig returns all TargetProviders configured in cfg. +func ProvidersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { + providers := map[string]TargetProvider{} + + app := func(mech string, i int, tp TargetProvider) { + providers[fmt.Sprintf("%s/%d", mech, i)] = tp + } + + for i, c := range cfg.DNSSDConfigs { + app("dns", i, dns.NewDiscovery(c)) + } + for i, c := range cfg.FileSDConfigs { + app("file", i, file.NewDiscovery(c)) + } + for i, c := range cfg.ConsulSDConfigs { + k, err := consul.NewDiscovery(c) + if err != nil { + log.Errorf("Cannot create Consul discovery: %s", err) + continue + } + app("consul", i, k) + } + for i, c := range cfg.MarathonSDConfigs { + m, err := marathon.NewDiscovery(c) + if err != nil { + log.Errorf("Cannot create Marathon discovery: %s", err) + continue + } + app("marathon", i, m) + } + for i, c := range cfg.KubernetesSDConfigs { + k, err := kubernetes.New(log.Base(), c) + if err != nil { + log.Errorf("Cannot create Kubernetes discovery: %s", err) + continue + } + app("kubernetes", i, k) + } + for i, c := range cfg.ServersetSDConfigs { + app("serverset", i, zookeeper.NewServersetDiscovery(c)) + } + for i, c := range cfg.NerveSDConfigs { + app("nerve", i, zookeeper.NewNerveDiscovery(c)) + } + for i, c := range cfg.EC2SDConfigs { + app("ec2", i, ec2.NewDiscovery(c)) + } + for i, c := range cfg.GCESDConfigs { + gced, err := gce.NewDiscovery(c) + if err != nil { + log.Errorf("Cannot initialize GCE discovery: %s", err) + continue + } + app("gce", i, gced) + } + for i, c := range cfg.AzureSDConfigs { + app("azure", i, azure.NewDiscovery(c)) + } + if len(cfg.StaticConfigs) > 0 { + app("static", 0, NewStaticProvider(cfg.StaticConfigs)) + } + + return providers } -// NewMarathon creates a new Marathon based discovery. -func NewMarathon(conf *config.MarathonSDConfig) (*marathon.Discovery, error) { - return marathon.NewDiscovery(conf) +// StaticProvider holds a list of target groups that never change. +type StaticProvider struct { + TargetGroups []*config.TargetGroup } -// NewDNS creates a new DNS based discovery. -func NewDNS(conf *config.DNSSDConfig) *dns.Discovery { - return dns.NewDiscovery(conf) +// NewStaticProvider returns a StaticProvider configured with the given +// target groups. +func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { + for i, tg := range groups { + tg.Source = fmt.Sprintf("%d", i) + } + return &StaticProvider{groups} +} + +// Run implements the TargetProvider interface. +func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // We still have to consider that the consumer exits right away in which case + // the context will be canceled. + select { + case ch <- sd.TargetGroups: + case <-ctx.Done(): + } + close(ch) } diff --git a/retrieval/discovery/ec2/ec2.go b/retrieval/discovery/ec2/ec2.go index 188909bbc..2d4138be3 100644 --- a/retrieval/discovery/ec2/ec2.go +++ b/retrieval/discovery/ec2/ec2.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package ec2 import ( "fmt" @@ -50,15 +50,13 @@ const ( var ( ec2SDRefreshFailuresCount = prometheus.NewCounter( prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_ec2_refresh_failures_total", - Help: "The number of EC2-SD scrape failures.", + Name: "prometheus_sd_ec2_refresh_failures_total", + Help: "The number of EC2-SD scrape failures.", }) ec2SDRefreshDuration = prometheus.NewSummary( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_ec2_refresh_duration_seconds", - Help: "The duration of a EC2-SD refresh in seconds.", + Name: "prometheus_sd_ec2_refresh_duration_seconds", + Help: "The duration of a EC2-SD refresh in seconds.", }) ) @@ -76,8 +74,8 @@ type EC2Discovery struct { port int } -// NewEC2Discovery returns a new EC2Discovery which periodically refreshes its targets. -func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery { +// NewDiscovery returns a new EC2Discovery which periodically refreshes its targets. +func NewDiscovery(conf *config.EC2SDConfig) *EC2Discovery { creds := credentials.NewStaticCredentials(conf.AccessKey, conf.SecretKey, "") if conf.AccessKey == "" && conf.SecretKey == "" { creds = nil diff --git a/retrieval/discovery/file/file.go b/retrieval/discovery/file/file.go index ba1ae5b0c..bcfa4db87 100644 --- a/retrieval/discovery/file/file.go +++ b/retrieval/discovery/file/file.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package file import ( "encoding/json" @@ -36,15 +36,13 @@ const fileSDFilepathLabel = model.MetaLabelPrefix + "filepath" var ( fileSDScanDuration = prometheus.NewSummary( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_file_scan_duration_seconds", - Help: "The duration of the File-SD scan in seconds.", + Name: "prometheus_sd_file_scan_duration_seconds", + Help: "The duration of the File-SD scan in seconds.", }) fileSDReadErrorsCount = prometheus.NewCounter( prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_file_read_errors_total", - Help: "The number of File-SD read errors.", + Name: "prometheus_sd_file_read_errors_total", + Help: "The number of File-SD read errors.", }) ) @@ -67,8 +65,8 @@ type FileDiscovery struct { lastRefresh map[string]int } -// NewFileDiscovery returns a new file discovery for the given paths. -func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery { +// NewDiscovery returns a new file discovery for the given paths. +func NewDiscovery(conf *config.FileSDConfig) *FileDiscovery { return &FileDiscovery{ paths: conf.Files, interval: time.Duration(conf.RefreshInterval), diff --git a/retrieval/discovery/file/file_test.go b/retrieval/discovery/file/file_test.go index 975189067..39ce91f47 100644 --- a/retrieval/discovery/file/file_test.go +++ b/retrieval/discovery/file/file_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package file import ( "fmt" @@ -41,7 +41,7 @@ func testFileSD(t *testing.T, ext string) { conf.RefreshInterval = model.Duration(1 * time.Hour) var ( - fsd = NewFileDiscovery(&conf) + fsd = NewDiscovery(&conf) ch = make(chan []*config.TargetGroup) ctx, cancel = context.WithCancel(context.Background()) ) diff --git a/retrieval/discovery/fixtures/target_groups.json b/retrieval/discovery/fixtures/target_groups.json deleted file mode 100644 index df4f0df19..000000000 --- a/retrieval/discovery/fixtures/target_groups.json +++ /dev/null @@ -1,11 +0,0 @@ -[ - { - "targets": ["localhost:9090", "example.org:443"], - "labels": { - "foo": "bar" - } - }, - { - "targets": ["my.domain"] - } -] diff --git a/retrieval/discovery/fixtures/target_groups.yml b/retrieval/discovery/fixtures/target_groups.yml deleted file mode 100644 index 5a4e6e5db..000000000 --- a/retrieval/discovery/fixtures/target_groups.yml +++ /dev/null @@ -1,5 +0,0 @@ -- targets: ['localhost:9090', 'example.org:443'] - labels: - foo: bar - -- targets: ['my.domain'] diff --git a/retrieval/discovery/gce/gce.go b/retrieval/discovery/gce/gce.go index c0f8e311d..1161e4e4b 100644 --- a/retrieval/discovery/gce/gce.go +++ b/retrieval/discovery/gce/gce.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package gce import ( "fmt" @@ -52,15 +52,13 @@ const ( var ( gceSDRefreshFailuresCount = prometheus.NewCounter( prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_gce_refresh_failures_total", - Help: "The number of GCE-SD refresh failures.", + Name: "prometheus_sd_gce_refresh_failures_total", + Help: "The number of GCE-SD refresh failures.", }) gceSDRefreshDuration = prometheus.NewSummary( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_gce_refresh_duration", - Help: "The duration of a GCE-SD refresh in seconds.", + Name: "prometheus_sd_gce_refresh_duration", + Help: "The duration of a GCE-SD refresh in seconds.", }) ) @@ -84,7 +82,7 @@ type GCEDiscovery struct { } // NewGCEDiscovery returns a new GCEDiscovery which periodically refreshes its targets. -func NewGCEDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) { +func NewDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) { gd := &GCEDiscovery{ project: conf.Project, zone: conf.Zone, diff --git a/retrieval/discovery/zookeeper/zookeeper.go b/retrieval/discovery/zookeeper/zookeeper.go index 31f78845f..eb11f0b98 100644 --- a/retrieval/discovery/zookeeper/zookeeper.go +++ b/retrieval/discovery/zookeeper/zookeeper.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package zookeeper import ( "encoding/json" @@ -42,17 +42,17 @@ type ZookeeperDiscovery struct { // NewNerveDiscovery returns a new NerveDiscovery for the given config. func NewNerveDiscovery(conf *config.NerveSDConfig) *ZookeeperDiscovery { - return NewZookeeperDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember) + return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember) } // NewServersetDiscovery returns a new ServersetDiscovery for the given config. func NewServersetDiscovery(conf *config.ServersetSDConfig) *ZookeeperDiscovery { - return NewZookeeperDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember) + return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember) } -// NewZookeeperDiscovery returns a new discovery along Zookeeper parses with +// NewDiscovery returns a new discovery along Zookeeper parses with // the given parse function. -func NewZookeeperDiscovery( +func NewDiscovery( srvs []string, timeout time.Duration, paths []string, diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 4a73ba823..4f4fe3159 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -31,22 +31,6 @@ import ( "github.com/prometheus/prometheus/storage" ) -// A TargetProvider provides information about target groups. It maintains a set -// of sources from which TargetGroups can originate. Whenever a target provider -// detects a potential change, it sends the TargetGroup through its provided channel. -// -// The TargetProvider does not have to guarantee that an actual change happened. -// It does guarantee that it sends the new TargetGroup whenever a change happens. -// -// Providers must initially send all known target groups as soon as it can. -type TargetProvider interface { - // Run hands a channel to the target provider through which it can send - // updated target groups. The channel must be closed by the target provider - // if no more updates will be sent. - // On receiving from done Run must return. - Run(ctx context.Context, up chan<- []*config.TargetGroup) -} - // TargetManager maintains a set of targets, starts and stops their scraping and // creates the new targets based on the target groups it receives from various // target providers. @@ -123,7 +107,7 @@ func (tm *TargetManager) reload() { } else { ts.reload(scfg) } - ts.runProviders(tm.ctx, providersFromConfig(scfg)) + ts.runProviders(tm.ctx, discovery.ProvidersFromConfig(scfg)) } // Remove old target sets. Waiting for stopping is already guaranteed @@ -257,7 +241,7 @@ func (ts *targetSet) sync() { ts.scrapePool.sync(all) } -func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) { +func (ts *targetSet) runProviders(ctx context.Context, providers map[string]discovery.TargetProvider) { // Lock for the entire time. This may mean up to 5 seconds until the full initial set // is retrieved and applied. // We could release earlier with some tweaks, but this is easier to reason about. @@ -281,7 +265,7 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ updates := make(chan []*config.TargetGroup) - go func(name string, prov TargetProvider) { + go func(name string, prov discovery.TargetProvider) { select { case <-ctx.Done(): case initial, ok := <-updates: @@ -366,71 +350,6 @@ func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error { return nil } -// providersFromConfig returns all TargetProviders configured in cfg. -func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { - providers := map[string]TargetProvider{} - - app := func(mech string, i int, tp TargetProvider) { - providers[fmt.Sprintf("%s/%d", mech, i)] = tp - } - - for i, c := range cfg.DNSSDConfigs { - app("dns", i, discovery.NewDNS(c)) - } - for i, c := range cfg.FileSDConfigs { - app("file", i, discovery.NewFileDiscovery(c)) - } - for i, c := range cfg.ConsulSDConfigs { - k, err := discovery.NewConsul(c) - if err != nil { - log.Errorf("Cannot create Consul discovery: %s", err) - continue - } - app("consul", i, k) - } - for i, c := range cfg.MarathonSDConfigs { - m, err := discovery.NewMarathon(c) - if err != nil { - log.Errorf("Cannot create Marathon discovery: %s", err) - continue - } - app("marathon", i, m) - } - for i, c := range cfg.KubernetesSDConfigs { - k, err := discovery.NewKubernetesDiscovery(c) - if err != nil { - log.Errorf("Cannot create Kubernetes discovery: %s", err) - continue - } - app("kubernetes", i, k) - } - for i, c := range cfg.ServersetSDConfigs { - app("serverset", i, discovery.NewServersetDiscovery(c)) - } - for i, c := range cfg.NerveSDConfigs { - app("nerve", i, discovery.NewNerveDiscovery(c)) - } - for i, c := range cfg.EC2SDConfigs { - app("ec2", i, discovery.NewEC2Discovery(c)) - } - for i, c := range cfg.GCESDConfigs { - gced, err := discovery.NewGCEDiscovery(c) - if err != nil { - log.Errorf("Cannot initialize GCE discovery: %s", err) - continue - } - app("gce", i, gced) - } - for i, c := range cfg.AzureSDConfigs { - app("azure", i, discovery.NewAzureDiscovery(c)) - } - if len(cfg.StaticConfigs) > 0 { - app("static", 0, NewStaticProvider(cfg.StaticConfigs)) - } - - return providers -} - // populateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns a nil label set if the target is dropped during relabeling. @@ -530,28 +449,3 @@ func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Targ } return targets, nil } - -// StaticProvider holds a list of target groups that never change. -type StaticProvider struct { - TargetGroups []*config.TargetGroup -} - -// NewStaticProvider returns a StaticProvider configured with the given -// target groups. -func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { - for i, tg := range groups { - tg.Source = fmt.Sprintf("%d", i) - } - return &StaticProvider{groups} -} - -// Run implements the TargetProvider interface. -func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - // We still have to consider that the consumer exits right away in which case - // the context will be canceled. - select { - case ch <- sd.TargetGroups: - case <-ctx.Done(): - } - close(ch) -} diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 65e5d6f2d..35163e947 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/retrieval/discovery" "github.com/prometheus/prometheus/storage/local" ) @@ -55,7 +56,7 @@ static_configs: ts := newTargetSet(scrapeConfig, mss) - ts.runProviders(context.Background(), providersFromConfig(scrapeConfig)) + ts.runProviders(context.Background(), discovery.ProvidersFromConfig(scrapeConfig)) verifyPresence(ts.tgroups, "static/0/0", true) verifyPresence(ts.tgroups, "static/0/1", true) @@ -69,7 +70,7 @@ static_configs: t.Fatalf("Unable to load YAML config sTwo: %s", err) } - ts.runProviders(context.Background(), providersFromConfig(scrapeConfig)) + ts.runProviders(context.Background(), discovery.ProvidersFromConfig(scrapeConfig)) verifyPresence(ts.tgroups, "static/0/0", true) verifyPresence(ts.tgroups, "static/0/1", false)