diff --git a/retrieval/discovery/azure.go b/discovery/azure/azure.go similarity index 94% rename from retrieval/discovery/azure.go rename to discovery/azure/azure.go index cec54af86e..c0f354230c 100644 --- a/retrieval/discovery/azure.go +++ b/discovery/azure/azure.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package azure import ( "fmt" @@ -45,15 +45,13 @@ const ( var ( azureSDRefreshFailuresCount = prometheus.NewCounter( prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_azure_refresh_failures_total", - Help: "Number of Azure-SD refresh failures.", + Name: "prometheus_sd_azure_refresh_failures_total", + Help: "Number of Azure-SD refresh failures.", }) azureSDRefreshDuration = prometheus.NewSummary( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_azure_refresh_duration_seconds", - Help: "The duration of a Azure-SD refresh in seconds.", + Name: "prometheus_sd_azure_refresh_duration_seconds", + Help: "The duration of a Azure-SD refresh in seconds.", }) ) @@ -70,8 +68,8 @@ type AzureDiscovery struct { port int } -// NewAzureDiscovery returns a new AzureDiscovery which periodically refreshes its targets. -func NewAzureDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery { +// NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets. +func NewDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery { return &AzureDiscovery{ cfg: cfg, interval: time.Duration(cfg.RefreshInterval), diff --git a/retrieval/discovery/consul/consul.go b/discovery/consul/consul.go similarity index 100% rename from retrieval/discovery/consul/consul.go rename to discovery/consul/consul.go diff --git a/discovery/discovery.go b/discovery/discovery.go new file mode 100644 index 0000000000..628ebb71f2 --- /dev/null +++ b/discovery/discovery.go @@ -0,0 +1,302 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "fmt" + "sync" + "time" + + "github.com/prometheus/common/log" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery/azure" + "github.com/prometheus/prometheus/discovery/consul" + "github.com/prometheus/prometheus/discovery/dns" + "github.com/prometheus/prometheus/discovery/ec2" + "github.com/prometheus/prometheus/discovery/file" + "github.com/prometheus/prometheus/discovery/gce" + "github.com/prometheus/prometheus/discovery/kubernetes" + "github.com/prometheus/prometheus/discovery/marathon" + "github.com/prometheus/prometheus/discovery/zookeeper" + "golang.org/x/net/context" +) + +// A TargetProvider provides information about target groups. It maintains a set +// of sources from which TargetGroups can originate. Whenever a target provider +// detects a potential change, it sends the TargetGroup through its provided channel. +// +// The TargetProvider does not have to guarantee that an actual change happened. +// It does guarantee that it sends the new TargetGroup whenever a change happens. +// +// TargetProviders should initially send a full set of all discoverable TargetGroups. +type TargetProvider interface { + // Run hands a channel to the target provider through which it can send + // updated target groups. + // Must returns if the context gets canceled. It should not close the update + // channel on returning. + Run(ctx context.Context, up chan<- []*config.TargetGroup) +} + +// ProvidersFromConfig returns all TargetProviders configured in cfg. +func ProvidersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { + providers := map[string]TargetProvider{} + + app := func(mech string, i int, tp TargetProvider) { + providers[fmt.Sprintf("%s/%d", mech, i)] = tp + } + + for i, c := range cfg.DNSSDConfigs { + app("dns", i, dns.NewDiscovery(c)) + } + for i, c := range cfg.FileSDConfigs { + app("file", i, file.NewDiscovery(c)) + } + for i, c := range cfg.ConsulSDConfigs { + k, err := consul.NewDiscovery(c) + if err != nil { + log.Errorf("Cannot create Consul discovery: %s", err) + continue + } + app("consul", i, k) + } + for i, c := range cfg.MarathonSDConfigs { + m, err := marathon.NewDiscovery(c) + if err != nil { + log.Errorf("Cannot create Marathon discovery: %s", err) + continue + } + app("marathon", i, m) + } + for i, c := range cfg.KubernetesSDConfigs { + k, err := kubernetes.New(log.Base(), c) + if err != nil { + log.Errorf("Cannot create Kubernetes discovery: %s", err) + continue + } + app("kubernetes", i, k) + } + for i, c := range cfg.ServersetSDConfigs { + app("serverset", i, zookeeper.NewServersetDiscovery(c)) + } + for i, c := range cfg.NerveSDConfigs { + app("nerve", i, zookeeper.NewNerveDiscovery(c)) + } + for i, c := range cfg.EC2SDConfigs { + app("ec2", i, ec2.NewDiscovery(c)) + } + for i, c := range cfg.GCESDConfigs { + gced, err := gce.NewDiscovery(c) + if err != nil { + log.Errorf("Cannot initialize GCE discovery: %s", err) + continue + } + app("gce", i, gced) + } + for i, c := range cfg.AzureSDConfigs { + app("azure", i, azure.NewDiscovery(c)) + } + if len(cfg.StaticConfigs) > 0 { + app("static", 0, NewStaticProvider(cfg.StaticConfigs)) + } + + return providers +} + +// StaticProvider holds a list of target groups that never change. +type StaticProvider struct { + TargetGroups []*config.TargetGroup +} + +// NewStaticProvider returns a StaticProvider configured with the given +// target groups. +func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { + for i, tg := range groups { + tg.Source = fmt.Sprintf("%d", i) + } + return &StaticProvider{groups} +} + +// Run implements the TargetProvider interface. +func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // We still have to consider that the consumer exits right away in which case + // the context will be canceled. + select { + case ch <- sd.TargetGroups: + case <-ctx.Done(): + } + close(ch) +} + +// TargetSet handles multiple TargetProviders and sends a full overview of their +// discovered TargetGroups to a Syncer. +type TargetSet struct { + mtx sync.RWMutex + // Sets of targets by a source string that is unique across target providers. + tgroups map[string]*config.TargetGroup + + syncer Syncer + + syncCh chan struct{} + providerCh chan map[string]TargetProvider + cancelProviders func() +} + +// Syncer receives updates complete sets of TargetGroups. +type Syncer interface { + Sync([]*config.TargetGroup) +} + +// NewTargetSet returns a new target sending TargetGroups to the Syncer. +func NewTargetSet(s Syncer) *TargetSet { + return &TargetSet{ + syncCh: make(chan struct{}, 1), + providerCh: make(chan map[string]TargetProvider), + syncer: s, + } +} + +// Run starts the processing of target providers and their updates. +// It blocks until the context gets canceled. +func (ts *TargetSet) Run(ctx context.Context) { +Loop: + for { + // Throttle syncing to once per five seconds. + select { + case <-ctx.Done(): + break Loop + case p := <-ts.providerCh: + ts.updateProviders(ctx, p) + case <-time.After(5 * time.Second): + } + + select { + case <-ctx.Done(): + break Loop + case <-ts.syncCh: + ts.sync() + case p := <-ts.providerCh: + ts.updateProviders(ctx, p) + } + } +} + +func (ts *TargetSet) sync() { + ts.mtx.RLock() + var all []*config.TargetGroup + for _, tg := range ts.tgroups { + all = append(all, tg) + } + ts.mtx.RUnlock() + + ts.syncer.Sync(all) +} + +// UpdateProviders sets new target providers for the target set. +func (ts *TargetSet) UpdateProviders(p map[string]TargetProvider) { + ts.providerCh <- p +} + +func (ts *TargetSet) updateProviders(ctx context.Context, providers map[string]TargetProvider) { + // Lock for the entire time. This may mean up to 5 seconds until the full initial set + // is retrieved and applied. + // We could release earlier with some tweaks, but this is easier to reason about. + ts.mtx.Lock() + defer ts.mtx.Unlock() + + // Stop all previous target providers of the target set. + if ts.cancelProviders != nil { + ts.cancelProviders() + } + ctx, ts.cancelProviders = context.WithCancel(ctx) + + var wg sync.WaitGroup + // (Re-)create a fresh tgroups map to not keep stale targets around. We + // will retrieve all targets below anyway, so cleaning up everything is + // safe and doesn't inflict any additional cost. + ts.tgroups = map[string]*config.TargetGroup{} + + for name, prov := range providers { + wg.Add(1) + + updates := make(chan []*config.TargetGroup) + go prov.Run(ctx, updates) + + go func(name string, prov TargetProvider) { + select { + case <-ctx.Done(): + case initial, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + break + } + // First set of all targets the provider knows. + for _, tgroup := range initial { + ts.setTargetGroup(name, tgroup) + } + case <-time.After(5 * time.Second): + // Initial set didn't arrive. Act as if it was empty + // and wait for updates later on. + } + wg.Done() + + // Start listening for further updates. + for { + select { + case <-ctx.Done(): + return + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + return + } + for _, tg := range tgs { + ts.update(name, tg) + } + } + } + }(name, prov) + } + + // We wait for a full initial set of target groups before releasing the mutex + // to ensure the initial sync is complete and there are no races with subsequent updates. + wg.Wait() + // Just signal that there are initial sets to sync now. Actual syncing must only + // happen in the runScraping loop. + select { + case ts.syncCh <- struct{}{}: + default: + } +} + +// update handles a target group update from a target provider identified by the name. +func (ts *TargetSet) update(name string, tgroup *config.TargetGroup) { + ts.mtx.Lock() + defer ts.mtx.Unlock() + + ts.setTargetGroup(name, tgroup) + + select { + case ts.syncCh <- struct{}{}: + default: + } +} + +func (ts *TargetSet) setTargetGroup(name string, tg *config.TargetGroup) { + if tg == nil { + return + } + ts.tgroups[name+"/"+tg.Source] = tg +} diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go new file mode 100644 index 0000000000..edb3480ac1 --- /dev/null +++ b/discovery/discovery_test.go @@ -0,0 +1,87 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "testing" + + "github.com/prometheus/prometheus/config" + "golang.org/x/net/context" + yaml "gopkg.in/yaml.v2" +) + +func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { + + verifyPresence := func(tgroups map[string]*config.TargetGroup, name string, present bool) { + if _, ok := tgroups[name]; ok != present { + msg := "" + if !present { + msg = "not " + } + t.Fatalf("'%s' should %sbe present in TargetSet.tgroups: %s", name, msg, tgroups) + } + } + + scrapeConfig := &config.ScrapeConfig{} + + sOne := ` +job_name: "foo" +static_configs: +- targets: ["foo:9090"] +- targets: ["bar:9090"] +` + if err := yaml.Unmarshal([]byte(sOne), scrapeConfig); err != nil { + t.Fatalf("Unable to load YAML config sOne: %s", err) + } + called := make(chan struct{}) + + ts := NewTargetSet(&mockSyncer{ + sync: func([]*config.TargetGroup) { called <- struct{}{} }, + }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go ts.Run(ctx) + + ts.UpdateProviders(ProvidersFromConfig(scrapeConfig)) + <-called + + verifyPresence(ts.tgroups, "static/0/0", true) + verifyPresence(ts.tgroups, "static/0/1", true) + + sTwo := ` +job_name: "foo" +static_configs: +- targets: ["foo:9090"] +` + if err := yaml.Unmarshal([]byte(sTwo), scrapeConfig); err != nil { + t.Fatalf("Unable to load YAML config sTwo: %s", err) + } + + ts.UpdateProviders(ProvidersFromConfig(scrapeConfig)) + <-called + + verifyPresence(ts.tgroups, "static/0/0", true) + verifyPresence(ts.tgroups, "static/0/1", false) +} + +type mockSyncer struct { + sync func(tgs []*config.TargetGroup) +} + +func (s *mockSyncer) Sync(tgs []*config.TargetGroup) { + if s.sync != nil { + s.sync(tgs) + } +} diff --git a/retrieval/discovery/dns/dns.go b/discovery/dns/dns.go similarity index 100% rename from retrieval/discovery/dns/dns.go rename to discovery/dns/dns.go diff --git a/retrieval/discovery/ec2.go b/discovery/ec2/ec2.go similarity index 92% rename from retrieval/discovery/ec2.go rename to discovery/ec2/ec2.go index 188909bbc4..2d4138be3d 100644 --- a/retrieval/discovery/ec2.go +++ b/discovery/ec2/ec2.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package ec2 import ( "fmt" @@ -50,15 +50,13 @@ const ( var ( ec2SDRefreshFailuresCount = prometheus.NewCounter( prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_ec2_refresh_failures_total", - Help: "The number of EC2-SD scrape failures.", + Name: "prometheus_sd_ec2_refresh_failures_total", + Help: "The number of EC2-SD scrape failures.", }) ec2SDRefreshDuration = prometheus.NewSummary( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_ec2_refresh_duration_seconds", - Help: "The duration of a EC2-SD refresh in seconds.", + Name: "prometheus_sd_ec2_refresh_duration_seconds", + Help: "The duration of a EC2-SD refresh in seconds.", }) ) @@ -76,8 +74,8 @@ type EC2Discovery struct { port int } -// NewEC2Discovery returns a new EC2Discovery which periodically refreshes its targets. -func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery { +// NewDiscovery returns a new EC2Discovery which periodically refreshes its targets. +func NewDiscovery(conf *config.EC2SDConfig) *EC2Discovery { creds := credentials.NewStaticCredentials(conf.AccessKey, conf.SecretKey, "") if conf.AccessKey == "" && conf.SecretKey == "" { creds = nil diff --git a/retrieval/discovery/file.go b/discovery/file/file.go similarity index 94% rename from retrieval/discovery/file.go rename to discovery/file/file.go index ba1ae5b0cf..bcfa4db871 100644 --- a/retrieval/discovery/file.go +++ b/discovery/file/file.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package file import ( "encoding/json" @@ -36,15 +36,13 @@ const fileSDFilepathLabel = model.MetaLabelPrefix + "filepath" var ( fileSDScanDuration = prometheus.NewSummary( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_file_scan_duration_seconds", - Help: "The duration of the File-SD scan in seconds.", + Name: "prometheus_sd_file_scan_duration_seconds", + Help: "The duration of the File-SD scan in seconds.", }) fileSDReadErrorsCount = prometheus.NewCounter( prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_file_read_errors_total", - Help: "The number of File-SD read errors.", + Name: "prometheus_sd_file_read_errors_total", + Help: "The number of File-SD read errors.", }) ) @@ -67,8 +65,8 @@ type FileDiscovery struct { lastRefresh map[string]int } -// NewFileDiscovery returns a new file discovery for the given paths. -func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery { +// NewDiscovery returns a new file discovery for the given paths. +func NewDiscovery(conf *config.FileSDConfig) *FileDiscovery { return &FileDiscovery{ paths: conf.Files, interval: time.Duration(conf.RefreshInterval), diff --git a/retrieval/discovery/file_test.go b/discovery/file/file_test.go similarity index 98% rename from retrieval/discovery/file_test.go rename to discovery/file/file_test.go index 9751890679..39ce91f476 100644 --- a/retrieval/discovery/file_test.go +++ b/discovery/file/file_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package file import ( "fmt" @@ -41,7 +41,7 @@ func testFileSD(t *testing.T, ext string) { conf.RefreshInterval = model.Duration(1 * time.Hour) var ( - fsd = NewFileDiscovery(&conf) + fsd = NewDiscovery(&conf) ch = make(chan []*config.TargetGroup) ctx, cancel = context.WithCancel(context.Background()) ) diff --git a/retrieval/discovery/fixtures/target_groups.json b/discovery/file/fixtures/target_groups.json similarity index 100% rename from retrieval/discovery/fixtures/target_groups.json rename to discovery/file/fixtures/target_groups.json diff --git a/retrieval/discovery/fixtures/target_groups.yml b/discovery/file/fixtures/target_groups.yml similarity index 100% rename from retrieval/discovery/fixtures/target_groups.yml rename to discovery/file/fixtures/target_groups.yml diff --git a/retrieval/discovery/gce.go b/discovery/gce/gce.go similarity index 94% rename from retrieval/discovery/gce.go rename to discovery/gce/gce.go index c0f8e311df..1161e4e4b0 100644 --- a/retrieval/discovery/gce.go +++ b/discovery/gce/gce.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package gce import ( "fmt" @@ -52,15 +52,13 @@ const ( var ( gceSDRefreshFailuresCount = prometheus.NewCounter( prometheus.CounterOpts{ - Namespace: namespace, - Name: "sd_gce_refresh_failures_total", - Help: "The number of GCE-SD refresh failures.", + Name: "prometheus_sd_gce_refresh_failures_total", + Help: "The number of GCE-SD refresh failures.", }) gceSDRefreshDuration = prometheus.NewSummary( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "sd_gce_refresh_duration", - Help: "The duration of a GCE-SD refresh in seconds.", + Name: "prometheus_sd_gce_refresh_duration", + Help: "The duration of a GCE-SD refresh in seconds.", }) ) @@ -84,7 +82,7 @@ type GCEDiscovery struct { } // NewGCEDiscovery returns a new GCEDiscovery which periodically refreshes its targets. -func NewGCEDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) { +func NewDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) { gd := &GCEDiscovery{ project: conf.Project, zone: conf.Zone, diff --git a/retrieval/discovery/kubernetes/endpoints.go b/discovery/kubernetes/endpoints.go similarity index 100% rename from retrieval/discovery/kubernetes/endpoints.go rename to discovery/kubernetes/endpoints.go diff --git a/retrieval/discovery/kubernetes/endpoints_test.go b/discovery/kubernetes/endpoints_test.go similarity index 100% rename from retrieval/discovery/kubernetes/endpoints_test.go rename to discovery/kubernetes/endpoints_test.go diff --git a/retrieval/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go similarity index 100% rename from retrieval/discovery/kubernetes/kubernetes.go rename to discovery/kubernetes/kubernetes.go diff --git a/retrieval/discovery/kubernetes/node.go b/discovery/kubernetes/node.go similarity index 100% rename from retrieval/discovery/kubernetes/node.go rename to discovery/kubernetes/node.go diff --git a/retrieval/discovery/kubernetes/node_test.go b/discovery/kubernetes/node_test.go similarity index 100% rename from retrieval/discovery/kubernetes/node_test.go rename to discovery/kubernetes/node_test.go diff --git a/retrieval/discovery/kubernetes/pod.go b/discovery/kubernetes/pod.go similarity index 100% rename from retrieval/discovery/kubernetes/pod.go rename to discovery/kubernetes/pod.go diff --git a/retrieval/discovery/kubernetes/pod_test.go b/discovery/kubernetes/pod_test.go similarity index 100% rename from retrieval/discovery/kubernetes/pod_test.go rename to discovery/kubernetes/pod_test.go diff --git a/retrieval/discovery/kubernetes/service.go b/discovery/kubernetes/service.go similarity index 100% rename from retrieval/discovery/kubernetes/service.go rename to discovery/kubernetes/service.go diff --git a/retrieval/discovery/kubernetes/service_test.go b/discovery/kubernetes/service_test.go similarity index 100% rename from retrieval/discovery/kubernetes/service_test.go rename to discovery/kubernetes/service_test.go diff --git a/retrieval/discovery/marathon/marathon.go b/discovery/marathon/marathon.go similarity index 100% rename from retrieval/discovery/marathon/marathon.go rename to discovery/marathon/marathon.go diff --git a/retrieval/discovery/marathon/marathon_test.go b/discovery/marathon/marathon_test.go similarity index 100% rename from retrieval/discovery/marathon/marathon_test.go rename to discovery/marathon/marathon_test.go diff --git a/retrieval/discovery/zookeeper.go b/discovery/zookeeper/zookeeper.go similarity index 94% rename from retrieval/discovery/zookeeper.go rename to discovery/zookeeper/zookeeper.go index 31f78845f6..eb11f0b98f 100644 --- a/retrieval/discovery/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package zookeeper import ( "encoding/json" @@ -42,17 +42,17 @@ type ZookeeperDiscovery struct { // NewNerveDiscovery returns a new NerveDiscovery for the given config. func NewNerveDiscovery(conf *config.NerveSDConfig) *ZookeeperDiscovery { - return NewZookeeperDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember) + return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember) } // NewServersetDiscovery returns a new ServersetDiscovery for the given config. func NewServersetDiscovery(conf *config.ServersetSDConfig) *ZookeeperDiscovery { - return NewZookeeperDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember) + return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember) } -// NewZookeeperDiscovery returns a new discovery along Zookeeper parses with +// NewDiscovery returns a new discovery along Zookeeper parses with // the given parse function. -func NewZookeeperDiscovery( +func NewDiscovery( srvs []string, timeout time.Duration, paths []string, diff --git a/retrieval/discovery/discovery.go b/retrieval/discovery/discovery.go deleted file mode 100644 index aebe1e816b..0000000000 --- a/retrieval/discovery/discovery.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "github.com/prometheus/common/log" - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/retrieval/discovery/consul" - "github.com/prometheus/prometheus/retrieval/discovery/dns" - "github.com/prometheus/prometheus/retrieval/discovery/kubernetes" - "github.com/prometheus/prometheus/retrieval/discovery/marathon" -) - -// NewConsul creates a new Consul based Discovery. -func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) { - return consul.NewDiscovery(cfg) -} - -// NewKubernetesDiscovery creates a Kubernetes service discovery based on the passed-in configuration. -func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Kubernetes, error) { - return kubernetes.New(log.Base(), conf) -} - -// NewMarathon creates a new Marathon based discovery. -func NewMarathon(conf *config.MarathonSDConfig) (*marathon.Discovery, error) { - return marathon.NewDiscovery(conf) -} - -// NewDNS creates a new DNS based discovery. -func NewDNS(conf *config.DNSSDConfig) *dns.Discovery { - return dns.NewDiscovery(conf) -} diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 8a9d8899be..24e7fd99c6 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -36,56 +36,46 @@ const ( scrapeHealthMetricName = "up" scrapeDurationMetricName = "scrape_duration_seconds" scrapeSamplesMetricName = "scrape_samples_scraped" - - // Constants for instrumentation. - namespace = "prometheus" - interval = "interval" - scrapeJob = "scrape_job" ) var ( targetIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "target_interval_length_seconds", + Name: "prometheus_target_interval_length_seconds", Help: "Actual intervals between scrapes.", Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, }, - []string{interval}, + []string{"interval"}, ) targetSkippedScrapes = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: namespace, - Name: "target_skipped_scrapes_total", - Help: "Total number of scrapes that were skipped because the metric storage was throttled.", + Name: "prometheus_target_skipped_scrapes_total", + Help: "Total number of scrapes that were skipped because the metric storage was throttled.", }, - []string{interval}, + []string{"interval"}, ) targetReloadIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "target_reload_length_seconds", + Name: "prometheus_target_reload_length_seconds", Help: "Actual interval to reload the scrape pool with a given configuration.", Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, }, - []string{interval}, + []string{"interval"}, ) targetSyncIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ - Namespace: namespace, - Name: "target_sync_length_seconds", + Name: "prometheus_target_sync_length_seconds", Help: "Actual interval to sync the scrape pool.", Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, }, - []string{scrapeJob}, + []string{"scrape_job"}, ) targetScrapePoolSyncsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: namespace, - Name: "target_scrape_pool_sync_total", - Help: "Total number of syncs that were executed on a scrape pool.", + Name: "prometheus_target_scrape_pool_sync_total", + Help: "Total number of syncs that were executed on a scrape pool.", }, - []string{scrapeJob}, + []string{"scrape_job"}, ) ) @@ -115,7 +105,7 @@ type scrapePool struct { newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop } -func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { +func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { client, err := NewHTTPClient(cfg) if err != nil { // Any errors that could occur here should be caught during config validation. @@ -124,6 +114,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape return &scrapePool{ appender: app, config: cfg, + ctx: ctx, client: client, targets: map[uint64]*Target{}, loops: map[uint64]loop{}, @@ -131,13 +122,6 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape } } -func (sp *scrapePool) init(ctx context.Context) { - sp.mtx.Lock() - defer sp.mtx.Unlock() - - sp.ctx = ctx -} - // stop terminates all scrape loops and returns after they all terminated. func (sp *scrapePool) stop() { var wg sync.WaitGroup @@ -165,6 +149,7 @@ func (sp *scrapePool) stop() { // This method returns after all scrape loops that were stopped have fully terminated. func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { start := time.Now() + sp.mtx.Lock() defer sp.mtx.Unlock() @@ -206,11 +191,32 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { ) } +// Sync converts target groups into actual scrape targets and synchronizes +// the currently running scraper with the resulting set. +func (sp *scrapePool) Sync(tgs []*config.TargetGroup) { + start := time.Now() + + var all []*Target + for _, tg := range tgs { + targets, err := targetsFromGroup(tg, sp.config) + if err != nil { + log.With("err", err).Error("creating targets failed") + continue + } + all = append(all, targets...) + } + sp.sync(all) + + targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( + time.Since(start).Seconds(), + ) + targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() +} + // sync takes a list of potentially duplicated targets, deduplicates them, starts // scrape loops for new targets, and stops scrape loops for disappeared targets. // It returns after all stopped scrape loops terminated. func (sp *scrapePool) sync(targets []*Target) { - start := time.Now() sp.mtx.Lock() defer sp.mtx.Unlock() @@ -256,10 +262,6 @@ func (sp *scrapePool) sync(targets []*Target) { // may be active and tries to insert. The old scraper that didn't terminate yet could still // be inserting a previous sample set. wg.Wait() - targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( - time.Since(start).Seconds(), - ) - targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() } // sampleAppender returns an appender for ingested samples from the target. diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 489d73c365..aaa19132f0 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -36,7 +36,7 @@ func TestNewScrapePool(t *testing.T) { var ( app = &nopAppender{} cfg = &config.ScrapeConfig{} - sp = newScrapePool(cfg, app) + sp = newScrapePool(context.Background(), cfg, app) ) if a, ok := sp.appender.(*nopAppender); !ok || a != app { @@ -231,7 +231,7 @@ func TestScrapePoolReportAppender(t *testing.T) { target := newTestTarget("example.com:80", 10*time.Millisecond, nil) app := &nopAppender{} - sp := newScrapePool(cfg, app) + sp := newScrapePool(context.Background(), cfg, app) cfg.HonorLabels = false wrapped := sp.reportAppender(target) @@ -266,7 +266,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { target := newTestTarget("example.com:80", 10*time.Millisecond, nil) app := &nopAppender{} - sp := newScrapePool(cfg, app) + sp := newScrapePool(context.Background(), cfg, app) cfg.HonorLabels = false wrapped := sp.sampleAppender(target) diff --git a/retrieval/target.go b/retrieval/target.go index af66b094ce..01a2bd2626 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -17,6 +17,7 @@ import ( "fmt" "hash/fnv" "io/ioutil" + "net" "net/http" "net/url" "strings" @@ -276,3 +277,103 @@ func (app relabelAppender) Append(s *model.Sample) error { return app.SampleAppender.Append(s) } + +// populateLabels builds a label set from the given label set and scrape configuration. +// It returns a label set before relabeling was applied as the second return value. +// Returns a nil label set if the target is dropped during relabeling. +func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) { + if _, ok := lset[model.AddressLabel]; !ok { + return nil, nil, fmt.Errorf("no address") + } + // Copy labels into the labelset for the target if they are not + // set already. Apply the labelsets in order of decreasing precedence. + scrapeLabels := model.LabelSet{ + model.SchemeLabel: model.LabelValue(cfg.Scheme), + model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), + model.JobLabel: model.LabelValue(cfg.JobName), + } + for ln, lv := range scrapeLabels { + if _, ok := lset[ln]; !ok { + lset[ln] = lv + } + } + // Encode scrape query parameters as labels. + for k, v := range cfg.Params { + if len(v) > 0 { + lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0]) + } + } + + preRelabelLabels := lset + lset = relabel.Process(lset, cfg.RelabelConfigs...) + + // Check if the target was dropped. + if lset == nil { + return nil, nil, nil + } + + // addPort checks whether we should add a default port to the address. + // If the address is not valid, we don't append a port either. + addPort := func(s string) bool { + // If we can split, a port exists and we don't have to add one. + if _, _, err := net.SplitHostPort(s); err == nil { + return false + } + // If adding a port makes it valid, the previous error + // was not due to an invalid address and we can append a port. + _, _, err := net.SplitHostPort(s + ":1234") + return err == nil + } + // If it's an address with no trailing port, infer it based on the used scheme. + if addr := string(lset[model.AddressLabel]); addPort(addr) { + // Addresses reaching this point are already wrapped in [] if necessary. + switch lset[model.SchemeLabel] { + case "http", "": + addr = addr + ":80" + case "https": + addr = addr + ":443" + default: + return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme) + } + lset[model.AddressLabel] = model.LabelValue(addr) + } + if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil { + return nil, nil, err + } + + // Meta labels are deleted after relabelling. Other internal labels propagate to + // the target which decides whether they will be part of their label set. + for ln := range lset { + if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { + delete(lset, ln) + } + } + + // Default the instance label to the target address. + if _, ok := lset[model.InstanceLabel]; !ok { + lset[model.InstanceLabel] = lset[model.AddressLabel] + } + return lset, preRelabelLabels, nil +} + +// targetsFromGroup builds targets based on the given TargetGroup and config. +func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { + targets := make([]*Target, 0, len(tg.Targets)) + + for i, lset := range tg.Targets { + // Combine target labels with target group labels. + for ln, lv := range tg.Labels { + if _, ok := lset[ln]; !ok { + lset[ln] = lv + } + } + labels, origLabels, err := populateLabels(lset, cfg) + if err != nil { + return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err) + } + if labels != nil { + targets = append(targets, NewTarget(labels, origLabels, cfg.Params)) + } + } + return targets, nil +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 4a73ba823f..7d96da02dd 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -14,39 +14,18 @@ package retrieval import ( - "fmt" - "net" "sort" - "strings" "sync" - "time" "github.com/prometheus/common/log" "github.com/prometheus/common/model" "golang.org/x/net/context" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/relabel" - "github.com/prometheus/prometheus/retrieval/discovery" + "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/storage" ) -// A TargetProvider provides information about target groups. It maintains a set -// of sources from which TargetGroups can originate. Whenever a target provider -// detects a potential change, it sends the TargetGroup through its provided channel. -// -// The TargetProvider does not have to guarantee that an actual change happened. -// It does guarantee that it sends the new TargetGroup whenever a change happens. -// -// Providers must initially send all known target groups as soon as it can. -type TargetProvider interface { - // Run hands a channel to the target provider through which it can send - // updated target groups. The channel must be closed by the target provider - // if no more updates will be sent. - // On receiving from done Run must return. - Run(ctx context.Context, up chan<- []*config.TargetGroup) -} - // TargetManager maintains a set of targets, starts and stops their scraping and // creates the new targets based on the target groups it receives from various // target providers. @@ -63,6 +42,14 @@ type TargetManager struct { targetSets map[string]*targetSet } +type targetSet struct { + ctx context.Context + cancel func() + + ts *discovery.TargetSet + sp *scrapePool +} + // NewTargetManager creates a new TargetManager. func NewTargetManager(app storage.SampleAppender) *TargetManager { return &TargetManager{ @@ -111,23 +98,33 @@ func (tm *TargetManager) reload() { ts, ok := tm.targetSets[scfg.JobName] if !ok { - ts = newTargetSet(scfg, tm.appender) + ctx, cancel := context.WithCancel(tm.ctx) + ts = &targetSet{ + ctx: ctx, + cancel: cancel, + sp: newScrapePool(ctx, scfg, tm.appender), + } + ts.ts = discovery.NewTargetSet(ts.sp) + tm.targetSets[scfg.JobName] = ts tm.wg.Add(1) go func(ts *targetSet) { - ts.runScraping(tm.ctx) + // Run target set, which blocks until its context is canceled. + // Gracefully shut down pending scrapes in the scrape pool afterwards. + ts.ts.Run(ctx) + ts.sp.stop() tm.wg.Done() }(ts) } else { - ts.reload(scfg) + ts.sp.reload(scfg) } - ts.runProviders(tm.ctx, providersFromConfig(scfg)) + ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg)) } - // Remove old target sets. Waiting for stopping is already guaranteed - // by the goroutine that started the target set. + // Remove old target sets. Waiting for scrape pools to complete pending + // scrape inserts is already guaranteed by the goroutine that started the target set. for name, ts := range tm.targetSets { if _, ok := jobs[name]; !ok { ts.cancel() @@ -145,14 +142,14 @@ func (tm *TargetManager) Pools() map[string]Targets { // TODO(fabxc): this is just a hack to maintain compatibility for now. for _, ps := range tm.targetSets { - ps.scrapePool.mtx.RLock() + ps.sp.mtx.RLock() - for _, t := range ps.scrapePool.targets { + for _, t := range ps.sp.targets { job := string(t.Labels()[model.JobLabel]) pools[job] = append(pools[job], t) } - ps.scrapePool.mtx.RUnlock() + ps.sp.mtx.RUnlock() } for _, targets := range pools { sort.Sort(targets) @@ -173,385 +170,3 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) error { } return nil } - -// targetSet holds several TargetProviders for which the same scrape configuration -// is used. It maintains target groups from all given providers and sync them -// to a scrape pool. -type targetSet struct { - mtx sync.RWMutex - - // Sets of targets by a source string that is unique across target providers. - tgroups map[string][]*Target - - scrapePool *scrapePool - config *config.ScrapeConfig - - syncCh chan struct{} - cancelScraping func() - cancelProviders func() -} - -func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { - ts := &targetSet{ - scrapePool: newScrapePool(cfg, app), - syncCh: make(chan struct{}, 1), - config: cfg, - } - return ts -} - -func (ts *targetSet) cancel() { - ts.mtx.RLock() - defer ts.mtx.RUnlock() - - if ts.cancelScraping != nil { - ts.cancelScraping() - } - if ts.cancelProviders != nil { - ts.cancelProviders() - } -} - -func (ts *targetSet) reload(cfg *config.ScrapeConfig) { - ts.mtx.Lock() - ts.config = cfg - ts.mtx.Unlock() - - ts.scrapePool.reload(cfg) -} - -func (ts *targetSet) runScraping(ctx context.Context) { - ctx, ts.cancelScraping = context.WithCancel(ctx) - - ts.scrapePool.init(ctx) - -Loop: - for { - // Throttle syncing to once per five seconds. - select { - case <-ctx.Done(): - break Loop - case <-time.After(5 * time.Second): - } - - select { - case <-ctx.Done(): - break Loop - case <-ts.syncCh: - ts.mtx.RLock() - ts.sync() - ts.mtx.RUnlock() - } - } - - // We want to wait for all pending target scrapes to complete though to ensure there'll - // be no more storage writes after this point. - ts.scrapePool.stop() -} - -func (ts *targetSet) sync() { - var all []*Target - for _, targets := range ts.tgroups { - all = append(all, targets...) - } - ts.scrapePool.sync(all) -} - -func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) { - // Lock for the entire time. This may mean up to 5 seconds until the full initial set - // is retrieved and applied. - // We could release earlier with some tweaks, but this is easier to reason about. - ts.mtx.Lock() - defer ts.mtx.Unlock() - - var wg sync.WaitGroup - - if ts.cancelProviders != nil { - ts.cancelProviders() - } - ctx, ts.cancelProviders = context.WithCancel(ctx) - - // (Re-)create a fresh tgroups map to not keep stale targets around. We - // will retrieve all targets below anyway, so cleaning up everything is - // safe and doesn't inflict any additional cost. - ts.tgroups = map[string][]*Target{} - - for name, prov := range providers { - wg.Add(1) - - updates := make(chan []*config.TargetGroup) - - go func(name string, prov TargetProvider) { - select { - case <-ctx.Done(): - case initial, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - break - } - // First set of all targets the provider knows. - for _, tgroup := range initial { - if tgroup == nil { - continue - } - targets, err := targetsFromGroup(tgroup, ts.config) - if err != nil { - log.With("target_group", tgroup).Errorf("Target update failed: %s", err) - continue - } - ts.tgroups[name+"/"+tgroup.Source] = targets - } - case <-time.After(5 * time.Second): - // Initial set didn't arrive. Act as if it was empty - // and wait for updates later on. - } - - wg.Done() - - // Start listening for further updates. - for { - select { - case <-ctx.Done(): - return - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - return - } - for _, tg := range tgs { - if err := ts.update(name, tg); err != nil { - log.With("target_group", tg).Errorf("Target update failed: %s", err) - } - } - } - } - }(name, prov) - - go prov.Run(ctx, updates) - } - - // We wait for a full initial set of target groups before releasing the mutex - // to ensure the initial sync is complete and there are no races with subsequent updates. - wg.Wait() - // Just signal that there are initial sets to sync now. Actual syncing must only - // happen in the runScraping loop. - select { - case ts.syncCh <- struct{}{}: - default: - } -} - -// update handles a target group update from a target provider identified by the name. -func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error { - if tgroup == nil { - return nil - } - targets, err := targetsFromGroup(tgroup, ts.config) - if err != nil { - return err - } - - ts.mtx.Lock() - defer ts.mtx.Unlock() - - ts.tgroups[name+"/"+tgroup.Source] = targets - - select { - case ts.syncCh <- struct{}{}: - default: - } - - return nil -} - -// providersFromConfig returns all TargetProviders configured in cfg. -func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { - providers := map[string]TargetProvider{} - - app := func(mech string, i int, tp TargetProvider) { - providers[fmt.Sprintf("%s/%d", mech, i)] = tp - } - - for i, c := range cfg.DNSSDConfigs { - app("dns", i, discovery.NewDNS(c)) - } - for i, c := range cfg.FileSDConfigs { - app("file", i, discovery.NewFileDiscovery(c)) - } - for i, c := range cfg.ConsulSDConfigs { - k, err := discovery.NewConsul(c) - if err != nil { - log.Errorf("Cannot create Consul discovery: %s", err) - continue - } - app("consul", i, k) - } - for i, c := range cfg.MarathonSDConfigs { - m, err := discovery.NewMarathon(c) - if err != nil { - log.Errorf("Cannot create Marathon discovery: %s", err) - continue - } - app("marathon", i, m) - } - for i, c := range cfg.KubernetesSDConfigs { - k, err := discovery.NewKubernetesDiscovery(c) - if err != nil { - log.Errorf("Cannot create Kubernetes discovery: %s", err) - continue - } - app("kubernetes", i, k) - } - for i, c := range cfg.ServersetSDConfigs { - app("serverset", i, discovery.NewServersetDiscovery(c)) - } - for i, c := range cfg.NerveSDConfigs { - app("nerve", i, discovery.NewNerveDiscovery(c)) - } - for i, c := range cfg.EC2SDConfigs { - app("ec2", i, discovery.NewEC2Discovery(c)) - } - for i, c := range cfg.GCESDConfigs { - gced, err := discovery.NewGCEDiscovery(c) - if err != nil { - log.Errorf("Cannot initialize GCE discovery: %s", err) - continue - } - app("gce", i, gced) - } - for i, c := range cfg.AzureSDConfigs { - app("azure", i, discovery.NewAzureDiscovery(c)) - } - if len(cfg.StaticConfigs) > 0 { - app("static", 0, NewStaticProvider(cfg.StaticConfigs)) - } - - return providers -} - -// populateLabels builds a label set from the given label set and scrape configuration. -// It returns a label set before relabeling was applied as the second return value. -// Returns a nil label set if the target is dropped during relabeling. -func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) { - if _, ok := lset[model.AddressLabel]; !ok { - return nil, nil, fmt.Errorf("no address") - } - // Copy labels into the labelset for the target if they are not - // set already. Apply the labelsets in order of decreasing precedence. - scrapeLabels := model.LabelSet{ - model.SchemeLabel: model.LabelValue(cfg.Scheme), - model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), - model.JobLabel: model.LabelValue(cfg.JobName), - } - for ln, lv := range scrapeLabels { - if _, ok := lset[ln]; !ok { - lset[ln] = lv - } - } - // Encode scrape query parameters as labels. - for k, v := range cfg.Params { - if len(v) > 0 { - lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0]) - } - } - - preRelabelLabels := lset - lset = relabel.Process(lset, cfg.RelabelConfigs...) - - // Check if the target was dropped. - if lset == nil { - return nil, nil, nil - } - - // addPort checks whether we should add a default port to the address. - // If the address is not valid, we don't append a port either. - addPort := func(s string) bool { - // If we can split, a port exists and we don't have to add one. - if _, _, err := net.SplitHostPort(s); err == nil { - return false - } - // If adding a port makes it valid, the previous error - // was not due to an invalid address and we can append a port. - _, _, err := net.SplitHostPort(s + ":1234") - return err == nil - } - // If it's an address with no trailing port, infer it based on the used scheme. - if addr := string(lset[model.AddressLabel]); addPort(addr) { - // Addresses reaching this point are already wrapped in [] if necessary. - switch lset[model.SchemeLabel] { - case "http", "": - addr = addr + ":80" - case "https": - addr = addr + ":443" - default: - return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme) - } - lset[model.AddressLabel] = model.LabelValue(addr) - } - if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil { - return nil, nil, err - } - - // Meta labels are deleted after relabelling. Other internal labels propagate to - // the target which decides whether they will be part of their label set. - for ln := range lset { - if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { - delete(lset, ln) - } - } - - // Default the instance label to the target address. - if _, ok := lset[model.InstanceLabel]; !ok { - lset[model.InstanceLabel] = lset[model.AddressLabel] - } - return lset, preRelabelLabels, nil -} - -// targetsFromGroup builds targets based on the given TargetGroup and config. -func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { - targets := make([]*Target, 0, len(tg.Targets)) - - for i, lset := range tg.Targets { - // Combine target labels with target group labels. - for ln, lv := range tg.Labels { - if _, ok := lset[ln]; !ok { - lset[ln] = lv - } - } - labels, origLabels, err := populateLabels(lset, cfg) - if err != nil { - return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err) - } - if labels != nil { - targets = append(targets, NewTarget(labels, origLabels, cfg.Params)) - } - } - return targets, nil -} - -// StaticProvider holds a list of target groups that never change. -type StaticProvider struct { - TargetGroups []*config.TargetGroup -} - -// NewStaticProvider returns a StaticProvider configured with the given -// target groups. -func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { - for i, tg := range groups { - tg.Source = fmt.Sprintf("%d", i) - } - return &StaticProvider{groups} -} - -// Run implements the TargetProvider interface. -func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - // We still have to consider that the consumer exits right away in which case - // the context will be canceled. - select { - case ch <- sd.TargetGroups: - case <-ctx.Done(): - } - close(ch) -} diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 65e5d6f2d1..dc4568a955 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -17,64 +17,10 @@ import ( "reflect" "testing" - "golang.org/x/net/context" - "gopkg.in/yaml.v2" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/storage/local" ) -func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { - - verifyPresence := func(tgroups map[string][]*Target, name string, present bool) { - if _, ok := tgroups[name]; ok != present { - msg := "" - if !present { - msg = "not " - } - t.Fatalf("'%s' should %sbe present in TargetSet.tgroups: %s", name, msg, tgroups) - } - - } - - scrapeConfig := &config.ScrapeConfig{} - - sOne := ` -job_name: "foo" -static_configs: -- targets: ["foo:9090"] -- targets: ["bar:9090"] -` - if err := yaml.Unmarshal([]byte(sOne), scrapeConfig); err != nil { - t.Fatalf("Unable to load YAML config sOne: %s", err) - } - - // Not properly setting it up, but that seems okay - mss := &local.MemorySeriesStorage{} - - ts := newTargetSet(scrapeConfig, mss) - - ts.runProviders(context.Background(), providersFromConfig(scrapeConfig)) - - verifyPresence(ts.tgroups, "static/0/0", true) - verifyPresence(ts.tgroups, "static/0/1", true) - - sTwo := ` -job_name: "foo" -static_configs: -- targets: ["foo:9090"] -` - if err := yaml.Unmarshal([]byte(sTwo), scrapeConfig); err != nil { - t.Fatalf("Unable to load YAML config sTwo: %s", err) - } - - ts.runProviders(context.Background(), providersFromConfig(scrapeConfig)) - - verifyPresence(ts.tgroups, "static/0/0", true) - verifyPresence(ts.tgroups, "static/0/1", false) -} - func mustNewRegexp(s string) config.Regexp { re, err := config.NewRegexp(s) if err != nil {