diff --git a/Godeps/_workspace/src/github.com/prometheus/client_golang/model/labelname.go b/Godeps/_workspace/src/github.com/prometheus/client_golang/model/labelname.go index 75b2e79dae..cebc14de39 100644 --- a/Godeps/_workspace/src/github.com/prometheus/client_golang/model/labelname.go +++ b/Godeps/_workspace/src/github.com/prometheus/client_golang/model/labelname.go @@ -26,14 +26,25 @@ const ( // timeseries. MetricNameLabel LabelName = "__name__" + // AddressLabel is the name of the label that holds the address of + // a scrape target. + AddressLabel LabelName = "__address__" + // ReservedLabelPrefix is a prefix which is not legal in user-supplied // label names. ReservedLabelPrefix = "__" + // MetaLabelPrefix is a prefix for labels that provide meta information. + // Labels with the prefix will not be attached to time series. + MetaLabelPrefix = "__meta_" + // JobLabel is the label name indicating the job from which a timeseries // was scraped. JobLabel LabelName = "job" + // InstanceLabel is the label name used for the instance label. + InstanceLabel LabelName = "instance" + // BucketLabel is used for the label that defines the upper bound of a // bucket of a histogram ("le" -> "less or equal"). BucketLabel = "le" diff --git a/config/config.go b/config/config.go index c844f3a0c7..1e582e36d7 100644 --- a/config/config.go +++ b/config/config.go @@ -16,6 +16,7 @@ package config import ( "fmt" "regexp" + "strings" "time" "github.com/golang/protobuf/proto" @@ -55,6 +56,21 @@ func (c Config) validateLabels(labels *pb.LabelPairs) error { return nil } +// validateHosts validates whether a target group contains valid hosts. +func (c Config) validateHosts(hosts []string) error { + if hosts == nil { + return nil + } + for _, host := range hosts { + // Make sure that this does not contain any paths or schemes. + // This ensures that old configurations error. + if strings.Contains(host, "/") { + return fmt.Errorf("invalid host '%s', no schemes or paths allowed", host) + } + } + return nil +} + // Validate checks an entire parsed Config for the validity of its fields. func (c Config) Validate() error { // Check the global configuration section for validity. @@ -93,6 +109,9 @@ func (c Config) Validate() error { if err := c.validateLabels(targetGroup.Labels); err != nil { return fmt.Errorf("invalid labels for job '%s': %s", job.GetName(), err) } + if err := c.validateHosts(targetGroup.Target); err != nil { + return fmt.Errorf("invalid targets for job '%s': %s", job.GetName(), err) + } } if job.SdName != nil && len(job.TargetGroup) > 0 { return fmt.Errorf("specified both DNS-SD name and target group for job: %s", job.GetName()) @@ -115,7 +134,7 @@ func (c Config) GetJobByName(name string) *JobConfig { // GlobalLabels returns the global labels as a LabelSet. func (c Config) GlobalLabels() clientmodel.LabelSet { labels := clientmodel.LabelSet{} - if c.Global.Labels != nil { + if c.Global != nil && c.Global.Labels != nil { for _, label := range c.Global.Labels.Label { labels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue()) } @@ -156,6 +175,11 @@ type JobConfig struct { pb.JobConfig } +// SDRefreshInterval gets the the SD refresh interval for a job. +func (c JobConfig) SDRefreshInterval() time.Duration { + return stringToDuration(c.GetSdRefreshInterval()) +} + // ScrapeInterval gets the scrape interval for a job. func (c JobConfig) ScrapeInterval() time.Duration { return stringToDuration(c.GetScrapeInterval()) @@ -165,3 +189,19 @@ func (c JobConfig) ScrapeInterval() time.Duration { func (c JobConfig) ScrapeTimeout() time.Duration { return stringToDuration(c.GetScrapeTimeout()) } + +// TargetGroup is derived from a protobuf TargetGroup and attaches a source to it +// that identifies the origin of the group. +type TargetGroup struct { + // Source is an identifier that describes a group of targets. + Source string + // Labels is a set of labels that is common across all targets in the group. + Labels clientmodel.LabelSet + // Targets is a list of targets identified by a label set. Each target is + // uniquely identifiable in the group by its address label. + Targets []clientmodel.LabelSet +} + +func (tg *TargetGroup) String() string { + return tg.Source +} diff --git a/config/config.proto b/config/config.proto index edfce2a7a7..d2f7414917 100644 --- a/config/config.proto +++ b/config/config.proto @@ -71,8 +71,10 @@ message JobConfig { // List of labeled target groups for this job. Only legal when DNS-SD isn't // used for a job. repeated TargetGroup target_group = 5; - // The HTTP resource path to fetch metrics from on targets. + // The HTTP resource path on which to fetch metrics from targets. optional string metrics_path = 6 [default = "/metrics"]; + // The URL scheme with which to fetch metrics from targets. + optional string scheme = 8 [default = "http"]; } // The top-level Prometheus configuration. diff --git a/config/fixtures/minimal.conf.input b/config/fixtures/minimal.conf.input index c08f510095..637d95e50c 100644 --- a/config/fixtures/minimal.conf.input +++ b/config/fixtures/minimal.conf.input @@ -13,8 +13,10 @@ global < job: < name: "prometheus" scrape_interval: "15s" + metrics_path: "/metrics" + scheme: "http" target_group: < - target: "http://localhost:9090/metrics.json" + target: "localhost:9090" > > diff --git a/config/fixtures/mixing_sd_and_manual_targets.conf.input b/config/fixtures/mixing_sd_and_manual_targets.conf.input index 919beb0c52..0d564234e9 100644 --- a/config/fixtures/mixing_sd_and_manual_targets.conf.input +++ b/config/fixtures/mixing_sd_and_manual_targets.conf.input @@ -2,6 +2,6 @@ job: < name: "testjob" sd_name: "sd_name" target_group: < - target: "http://sampletarget:8080/metrics.json" + target: "sampletarget:8080" > > diff --git a/config/fixtures/sample.conf.input b/config/fixtures/sample.conf.input index 1a4ec17bf6..8ea3a069db 100644 --- a/config/fixtures/sample.conf.input +++ b/config/fixtures/sample.conf.input @@ -15,7 +15,7 @@ job: < scrape_interval: "15s" target_group: < - target: "http://localhost:9090/metrics.json" + target: "localhost:9090" labels: < label: < name: "group" @@ -30,11 +30,12 @@ job: < scrape_interval: "30s" target_group: < - target: "http://random.com:8080/metrics.json" - target: "http://random.com:8081/metrics.json" - target: "http://random.com:8082/metrics.json" - target: "http://random.com:8083/metrics.json" - target: "http://random.com:8084/metrics.json" + target: "random.com:8080" + target: "random.com:8081" + target: "random.com:8082" + target: "random.com:8083" + target: "random.com:8084" + labels: < label: < name: "group" @@ -43,8 +44,9 @@ job: < > > target_group: < - target: "http://random.com:8085/metrics.json" - target: "http://random.com:8086/metrics.json" + target: "random.com:8085" + target: "random.com:8086" + labels: < label: < name: "group" diff --git a/config/generated/config.pb.go b/config/generated/config.pb.go index adc7dd272e..089f9053b7 100644 --- a/config/generated/config.pb.go +++ b/config/generated/config.pb.go @@ -169,8 +169,10 @@ type JobConfig struct { // List of labeled target groups for this job. Only legal when DNS-SD isn't // used for a job. TargetGroup []*TargetGroup `protobuf:"bytes,5,rep,name=target_group" json:"target_group,omitempty"` - // The HTTP resource path to fetch metrics from on targets. - MetricsPath *string `protobuf:"bytes,6,opt,name=metrics_path,def=/metrics" json:"metrics_path,omitempty"` + // The HTTP resource path on which to fetch metrics from targets. + MetricsPath *string `protobuf:"bytes,6,opt,name=metrics_path,def=/metrics" json:"metrics_path,omitempty"` + // The URL scheme with which to fetch metrics from targets. + Scheme *string `protobuf:"bytes,8,opt,name=scheme,def=http" json:"scheme,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -181,6 +183,7 @@ func (*JobConfig) ProtoMessage() {} const Default_JobConfig_ScrapeTimeout string = "10s" const Default_JobConfig_SdRefreshInterval string = "30s" const Default_JobConfig_MetricsPath string = "/metrics" +const Default_JobConfig_Scheme string = "http" func (m *JobConfig) GetName() string { if m != nil && m.Name != nil { @@ -231,6 +234,13 @@ func (m *JobConfig) GetMetricsPath() string { return Default_JobConfig_MetricsPath } +func (m *JobConfig) GetScheme() string { + if m != nil && m.Scheme != nil { + return *m.Scheme + } + return Default_JobConfig_Scheme +} + // The top-level Prometheus configuration. type PrometheusConfig struct { // Global Prometheus configuration options. If omitted, an empty global diff --git a/main.go b/main.go index 1deabd339b..83cceb2ec4 100644 --- a/main.go +++ b/main.go @@ -77,7 +77,7 @@ var ( type prometheus struct { ruleManager manager.RuleManager - targetManager retrieval.TargetManager + targetManager *retrieval.TargetManager notificationHandler *notification.NotificationHandler storage local.Storage remoteStorageQueues []*remote.StorageQueueManager @@ -152,8 +152,11 @@ func NewPrometheus() *prometheus { sampleAppender = fanout } - targetManager := retrieval.NewTargetManager(sampleAppender, conf.GlobalLabels()) - targetManager.AddTargetsFromConfig(conf) + targetManager, err := retrieval.NewTargetManager(conf, sampleAppender) + if err != nil { + glog.Errorf("Error creating target manager: %s", err) + os.Exit(1) + } ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{ SampleAppender: sampleAppender, @@ -176,7 +179,7 @@ func NewPrometheus() *prometheus { BuildInfo: BuildInfo, Config: conf.String(), RuleManager: ruleManager, - TargetPools: targetManager.Pools(), + TargetPools: targetManager.Pools, Flags: flags, Birth: time.Now(), PathPrefix: *pathPrefix, @@ -231,6 +234,7 @@ func (p *prometheus) Serve() { } go p.ruleManager.Run() go p.notificationHandler.Run() + go p.targetManager.Run() p.storage.Start() diff --git a/retrieval/target_provider.go b/retrieval/discovery/dns.go similarity index 57% rename from retrieval/target_provider.go rename to retrieval/discovery/dns.go index 209b7c5cef..6b71dc0d18 100644 --- a/retrieval/target_provider.go +++ b/retrieval/discovery/dns.go @@ -1,4 +1,4 @@ -// Copyright 2013 The Prometheus Authors +// Copyright 2015 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -11,13 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package retrieval +package discovery import ( "fmt" "net" - "net/url" "strings" + "sync" "time" "github.com/golang/glog" @@ -25,12 +25,18 @@ import ( "github.com/prometheus/client_golang/prometheus" clientmodel "github.com/prometheus/client_golang/model" - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/utility" ) -const resolvConf = "/etc/resolv.conf" +const ( + resolvConf = "/etc/resolv.conf" + + dnsSourcePrefix = "dns" + + // Constants for instrumentation. + namespace = "prometheus" + interval = "interval" +) var ( dnsSDLookupsCount = prometheus.NewCounter( @@ -52,65 +58,70 @@ func init() { prometheus.MustRegister(dnsSDLookupsCount) } -// TargetProvider encapsulates retrieving all targets for a job. -type TargetProvider interface { - // Retrieves the current list of targets for this provider. - Targets() ([]Target, error) +// DNSDiscovery periodically performs DNS-SD requests. It implements +// the TargetProvider interface. +type DNSDiscovery struct { + name string + + done chan struct{} + ticker *time.Ticker + m sync.RWMutex } -type sdTargetProvider struct { - job config.JobConfig - globalLabels clientmodel.LabelSet - targets []Target - - lastRefresh time.Time - refreshInterval time.Duration -} - -// NewSdTargetProvider constructs a new sdTargetProvider for a job. -func NewSdTargetProvider(job config.JobConfig, globalLabels clientmodel.LabelSet) *sdTargetProvider { - i, err := utility.StringToDuration(job.GetSdRefreshInterval()) - if err != nil { - panic(fmt.Sprintf("illegal refresh duration string %s: %s", job.GetSdRefreshInterval(), err)) - } - return &sdTargetProvider{ - job: job, - globalLabels: globalLabels, - refreshInterval: i, +// NewDNSDiscovery returns a new DNSDiscovery which periodically refreshes its targets. +func NewDNSDiscovery(name string, refreshInterval time.Duration) *DNSDiscovery { + return &DNSDiscovery{ + name: name, + done: make(chan struct{}), + ticker: time.NewTicker(refreshInterval), } } -func (p *sdTargetProvider) Targets() ([]Target, error) { - var err error - defer func() { - dnsSDLookupsCount.Inc() - if err != nil { - dnsSDLookupFailuresCount.Inc() +// Run implements the TargetProvider interface. +func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup) { + defer close(ch) + + // Get an initial set right away. + if err := dd.refresh(ch); err != nil { + glog.Errorf("Error refreshing DNS targets: %s", err) + } + + for { + select { + case <-dd.ticker.C: + if err := dd.refresh(ch); err != nil { + glog.Errorf("Error refreshing DNS targets: %s", err) + } + case <-dd.done: + return } - }() - - if time.Since(p.lastRefresh) < p.refreshInterval { - return p.targets, nil } +} - response, err := lookupSRV(p.job.GetSdName()) +// Stop implements the TargetProvider interface. +func (dd *DNSDiscovery) Stop() { + glog.V(1).Info("Stopping DNS discovery for %s...", dd.name) + dd.ticker.Stop() + dd.done <- struct{}{} + + glog.V(1).Info("DNS discovery for %s stopped.", dd.name) +} + +// Sources implements the TargetProvider interface. +func (dd *DNSDiscovery) Sources() []string { + return []string{dnsSourcePrefix + ":" + dd.name} +} + +func (dd *DNSDiscovery) refresh(ch chan<- *config.TargetGroup) error { + response, err := lookupSRV(dd.name) + dnsSDLookupsCount.Inc() if err != nil { - return nil, err + dnsSDLookupFailuresCount.Inc() + return err } - baseLabels := clientmodel.LabelSet{ - clientmodel.JobLabel: clientmodel.LabelValue(p.job.GetName()), - } - for n, v := range p.globalLabels { - baseLabels[n] = v - } - - targets := make([]Target, 0, len(response.Answer)) - endpoint := &url.URL{ - Scheme: "http", - Path: p.job.GetMetricsPath(), - } + tg := &config.TargetGroup{} for _, record := range response.Answer { addr, ok := record.(*dns.SRV) if !ok { @@ -118,22 +129,24 @@ func (p *sdTargetProvider) Targets() ([]Target, error) { continue } // Remove the final dot from rooted DNS names to make them look more usual. - if addr.Target[len(addr.Target)-1] == '.' { - addr.Target = addr.Target[:len(addr.Target)-1] - } - endpoint.Host = fmt.Sprintf("%s:%d", addr.Target, addr.Port) - t := NewTarget(endpoint.String(), p.job.ScrapeTimeout(), baseLabels) - targets = append(targets, t) + addr.Target = strings.TrimRight(addr.Target, ".") + + target := clientmodel.LabelValue(fmt.Sprintf("%s:%d", addr.Target, addr.Port)) + tg.Targets = append(tg.Targets, clientmodel.LabelSet{ + clientmodel.AddressLabel: target, + }) } - p.targets = targets - return targets, nil + tg.Source = dnsSourcePrefix + ":" + dd.name + ch <- tg + + return nil } func lookupSRV(name string) (*dns.Msg, error) { conf, err := dns.ClientConfigFromFile(resolvConf) if err != nil { - return nil, fmt.Errorf("couldn't load resolv.conf: %s", err) + return nil, fmt.Errorf("could not load resolv.conf: %s", err) } client := &dns.Client{} @@ -143,12 +156,12 @@ func lookupSRV(name string) (*dns.Msg, error) { servAddr := net.JoinHostPort(server, conf.Port) for _, suffix := range conf.Search { response, err = lookup(name, dns.TypeSRV, client, servAddr, suffix, false) - if err == nil { - if len(response.Answer) > 0 { - return response, nil - } - } else { + if err != nil { glog.Warningf("resolving %s.%s failed: %s", name, suffix, err) + continue + } + if len(response.Answer) > 0 { + return response, nil } } response, err = lookup(name, dns.TypeSRV, client, servAddr, "", false) @@ -156,7 +169,7 @@ func lookupSRV(name string) (*dns.Msg, error) { return response, nil } } - return response, fmt.Errorf("couldn't resolve %s: No server responded", name) + return response, fmt.Errorf("could not resolve %s: No server responded", name) } func lookup(name string, queryType uint16, client *dns.Client, servAddr string, suffix string, edns bool) (*dns.Msg, error) { @@ -179,7 +192,6 @@ func lookup(name string, queryType uint16, client *dns.Client, servAddr string, if err != nil { return nil, err } - if msg.Id != response.Id { return nil, fmt.Errorf("DNS ID mismatch, request: %d, response: %d", msg.Id, response.Id) } @@ -188,11 +200,9 @@ func lookup(name string, queryType uint16, client *dns.Client, servAddr string, if client.Net == "tcp" { return nil, fmt.Errorf("got truncated message on tcp") } - if edns { // Truncated even though EDNS is used client.Net = "tcp" } - return lookup(name, queryType, client, servAddr, suffix, !edns) } diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index ce96aa2e88..f477bbd89d 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -17,6 +17,8 @@ import ( "time" clientmodel "github.com/prometheus/client_golang/model" + + "github.com/prometheus/prometheus/config" ) type nopAppender struct{} @@ -38,3 +40,25 @@ type collectResultAppender struct { func (a *collectResultAppender) Append(s *clientmodel.Sample) { a.result = append(a.result, s) } + +// fakeTargetProvider implements a TargetProvider and allows manual injection +// of TargetGroups through the update channel. +type fakeTargetProvider struct { + sources []string + update chan *config.TargetGroup +} + +func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup) { + defer close(ch) + for tg := range tp.update { + ch <- tg + } +} + +func (tp *fakeTargetProvider) Stop() { + close(tp.update) +} + +func (tp *fakeTargetProvider) Sources() []string { + return tp.sources +} diff --git a/retrieval/interface_test.go b/retrieval/interface_test.go deleted file mode 100644 index 5377e82359..0000000000 --- a/retrieval/interface_test.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package retrieval - -import ( - "testing" -) - -func TestInterfaces(t *testing.T) { - var ( - _ Target = &target{} - _ TargetManager = &targetManager{} - ) -} diff --git a/retrieval/target.go b/retrieval/target.go index dba7f0b8e4..a87a5ddb40 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -30,13 +30,12 @@ import ( clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/utility" ) const ( - // InstanceLabel is the label value used for the instance label. - InstanceLabel clientmodel.LabelName = "instance" // ScrapeHealthMetricName is the metric name for the synthetic health // variable. scrapeHealthMetricName clientmodel.LabelValue = "up" @@ -54,7 +53,7 @@ const ( var ( errIngestChannelFull = errors.New("ingestion channel full") - localhostRepresentations = []string{"http://127.0.0.1", "http://localhost"} + localhostRepresentations = []string{"127.0.0.1", "localhost"} targetIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -131,23 +130,16 @@ type Target interface { // Return the target's base labels without job and instance label. That's // useful for display purposes. BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet - // SetBaseLabelsFrom sets the target's base labels to the base labels - // of the provided target. - SetBaseLabelsFrom(Target) - // Scrape target at the specified interval. - RunScraper(storage.SampleAppender, time.Duration) + // Start scraping the target in regular intervals. + RunScraper(storage.SampleAppender) // Stop scraping, synchronous. StopScraper() + // Update the target's state. + Update(config.JobConfig, clientmodel.LabelSet) } // target is a Target that refers to a singular HTTP or HTTPS endpoint. type target struct { - // The current health state of the target. - state TargetState - // The last encountered scrape error, if any. - lastError error - // The last time a scrape was attempted. - lastScrape time.Time // Closing scraperStopping signals that scraping should stop. scraperStopping chan struct{} // Closing scraperStopped signals that scraping has been stopped. @@ -155,34 +147,67 @@ type target struct { // Channel to buffer ingested samples. ingestedSamples chan clientmodel.Samples - url string - // What is the deadline for the HTTP or HTTPS against this endpoint. - deadline time.Duration - // Any base labels that are added to this target and its metrics. - baseLabels clientmodel.LabelSet // The HTTP client used to scrape the target's endpoint. httpClient *http.Client - // Mutex protects lastError, lastScrape, state, and baseLabels. + // Mutex protects the members below. sync.RWMutex + + url *url.URL + // Any base labels that are added to this target and its metrics. + baseLabels clientmodel.LabelSet + // The current health state of the target. + state TargetState + // The last encountered scrape error, if any. + lastError error + // The last time a scrape was attempted. + lastScrape time.Time + // What is the deadline for the HTTP or HTTPS against this endpoint. + deadline time.Duration + // The time between two scrapes. + scrapeInterval time.Duration } // NewTarget creates a reasonably configured target for querying. -func NewTarget(url string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target { +func NewTarget(address string, cfg config.JobConfig, baseLabels clientmodel.LabelSet) Target { t := &target{ - url: url, - deadline: deadline, - httpClient: utility.NewDeadlineClient(deadline), + url: &url.URL{ + Host: address, + }, scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), } - t.baseLabels = clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())} - for baseLabel, baseValue := range baseLabels { - t.baseLabels[baseLabel] = baseValue - } + t.Update(cfg, baseLabels) return t } +// Update overwrites settings in the target that are derived from the job config +// it belongs to. +func (t *target) Update(cfg config.JobConfig, baseLabels clientmodel.LabelSet) { + t.Lock() + defer t.Unlock() + + t.url.Scheme = cfg.GetScheme() + t.url.Path = cfg.GetMetricsPath() + + t.scrapeInterval = cfg.ScrapeInterval() + t.deadline = cfg.ScrapeTimeout() + t.httpClient = utility.NewDeadlineClient(cfg.ScrapeTimeout()) + + t.baseLabels = clientmodel.LabelSet{ + clientmodel.InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier()), + } + for name, val := range baseLabels { + t.baseLabels[name] = val + } +} + +func (t *target) String() string { + t.RLock() + defer t.RUnlock() + return t.url.Host +} + // Ingest implements Target and extraction.Ingester. func (t *target) Ingest(s clientmodel.Samples) error { // Since the regular case is that ingestedSamples is ready to receive, @@ -202,10 +227,16 @@ func (t *target) Ingest(s clientmodel.Samples) error { } // RunScraper implements Target. -func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time.Duration) { +func (t *target) RunScraper(sampleAppender storage.SampleAppender) { defer close(t.scraperStopped) - jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64())) + t.RLock() + lastScrapeInterval := t.scrapeInterval + t.RUnlock() + + glog.V(1).Infof("Starting scraper for target %v...", t) + + jitterTimer := time.NewTimer(time.Duration(float64(lastScrapeInterval) * rand.Float64())) select { case <-jitterTimer.C: case <-t.scraperStopping: @@ -214,7 +245,7 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time } jitterTimer.Stop() - ticker := time.NewTicker(interval) + ticker := time.NewTicker(lastScrapeInterval) defer ticker.Stop() t.Lock() // Writing t.lastScrape requires the lock. @@ -238,11 +269,21 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time case <-t.scraperStopping: return case <-ticker.C: - t.Lock() // Write t.lastScrape requires locking. + t.Lock() took := time.Since(t.lastScrape) t.lastScrape = time.Now() + + intervalStr := lastScrapeInterval.String() + + // On changed scrape interval the new interval becomes effective + // after the next scrape. + if lastScrapeInterval != t.scrapeInterval { + ticker = time.NewTicker(t.scrapeInterval) + lastScrapeInterval = t.scrapeInterval + } t.Unlock() - targetIntervalLength.WithLabelValues(interval.String()).Observe( + + targetIntervalLength.WithLabelValues(intervalStr).Observe( float64(took) / float64(time.Second), // Sub-second precision. ) t.scrape(sampleAppender) @@ -253,8 +294,12 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time // StopScraper implements Target. func (t *target) StopScraper() { + glog.V(1).Infof("Stopping scraper for target %v...", t) + close(t.scraperStopping) <-t.scraperStopped + + glog.V(1).Infof("Scraper for target %v stopped.", t) } const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` @@ -277,7 +322,7 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { t.Unlock() }(time.Now()) - req, err := http.NewRequest("GET", t.URL(), nil) + req, err := http.NewRequest("GET", t.url.String(), nil) if err != nil { panic(err) } @@ -339,41 +384,43 @@ func (t *target) LastScrape() time.Time { // URL implements Target. func (t *target) URL() string { - return t.url + t.RLock() + defer t.RUnlock() + return t.url.String() } // InstanceIdentifier implements Target. func (t *target) InstanceIdentifier() string { - u, err := url.Parse(t.url) - if err != nil { - glog.Warningf("Could not parse instance URL when generating identifier, using raw URL: %s", err) - return t.url - } // If we are given a port in the host port, use that. - if strings.Contains(u.Host, ":") { - return u.Host - } - // Otherwise, deduce port based on protocol. - if u.Scheme == "http" { - return fmt.Sprintf("%s:80", u.Host) - } else if u.Scheme == "https" { - return fmt.Sprintf("%s:443", u.Host) + if strings.Contains(t.url.Host, ":") { + return t.url.Host } - glog.Warningf("Unknown scheme %s when generating identifier, using raw URL.", u.Scheme) - return t.url + t.RLock() + defer t.RUnlock() + + // Otherwise, deduce port based on protocol. + if t.url.Scheme == "http" { + return fmt.Sprintf("%s:80", t.url.Host) + } else if t.url.Scheme == "https" { + return fmt.Sprintf("%s:443", t.url.Host) + } + + glog.Warningf("Unknown scheme %s when generating identifier, using host without port number.", t.url.Scheme) + return t.url.Host } // GlobalURL implements Target. func (t *target) GlobalURL() string { - url := t.url + url := t.URL() + hostname, err := os.Hostname() if err != nil { glog.Warningf("Couldn't get hostname: %s, returning target.URL()", err) return url } for _, localhostRepresentation := range localhostRepresentations { - url = strings.Replace(url, localhostRepresentation, fmt.Sprintf("http://%s", hostname), -1) + url = strings.Replace(url, "//"+localhostRepresentation, "//"+hostname, 1) } return url } @@ -389,23 +436,13 @@ func (t *target) BaseLabels() clientmodel.LabelSet { func (t *target) BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet { ls := clientmodel.LabelSet{} for ln, lv := range t.BaseLabels() { - if ln != clientmodel.JobLabel && ln != InstanceLabel { + if ln != clientmodel.JobLabel && ln != clientmodel.InstanceLabel { ls[ln] = lv } } return ls } -// SetBaseLabelsFrom implements Target. -func (t *target) SetBaseLabelsFrom(newTarget Target) { - if t.URL() != newTarget.URL() { - panic("targets don't refer to the same endpoint") - } - t.Lock() - defer t.Unlock() - t.baseLabels = newTarget.BaseLabels() -} - func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) { healthMetric := clientmodel.Metric{} durationMetric := clientmodel.Metric{} diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 427f630367..6ab80050b2 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -18,56 +18,46 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "reflect" + "strings" "testing" "time" clientmodel "github.com/prometheus/client_golang/model" + "github.com/golang/protobuf/proto" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/utility" ) +func TestTargetInterface(t *testing.T) { + var _ Target = &target{} +} + func TestBaseLabels(t *testing.T) { - target := NewTarget("http://example.com/metrics", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"}) - want := clientmodel.LabelSet{"job": "some_job", "foo": "bar", "instance": "example.com:80"} + target := newTestTarget("example.com", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"}) + want := clientmodel.LabelSet{ + clientmodel.JobLabel: "some_job", + clientmodel.InstanceLabel: "example.com:80", + "foo": "bar", + } got := target.BaseLabels() if !reflect.DeepEqual(want, got) { t.Errorf("want base labels %v, got %v", want, got) } - delete(want, "job") - delete(want, "instance") + delete(want, clientmodel.JobLabel) + delete(want, clientmodel.InstanceLabel) + got = target.BaseLabelsWithoutJobAndInstance() if !reflect.DeepEqual(want, got) { t.Errorf("want base labels %v, got %v", want, got) } } -func TestTargetHidesURLAuth(t *testing.T) { - testVectors := []string{"http://secret:data@host.com/query?args#fragment", "https://example.net/foo", "http://foo.com:31337/bar"} - testResults := []string{"host.com:80", "example.net:443", "foo.com:31337"} - if len(testVectors) != len(testResults) { - t.Errorf("Test vector length does not match test result length.") - } - - for i := 0; i < len(testVectors); i++ { - testTarget := target{ - state: Unknown, - url: testVectors[i], - httpClient: utility.NewDeadlineClient(0), - } - u := testTarget.InstanceIdentifier() - if u != testResults[i] { - t.Errorf("Expected InstanceIdentifier to be %v, actual %v", testResults[i], u) - } - } -} - func TestTargetScrapeUpdatesState(t *testing.T) { - testTarget := target{ - state: Unknown, - url: "bad schema", - httpClient: utility.NewDeadlineClient(0), - } + testTarget := newTestTarget("bad schema", 0, nil) + testTarget.scrape(nopAppender{}) if testTarget.state != Unhealthy { t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state) @@ -89,11 +79,7 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { ) defer server.Close() - testTarget := NewTarget( - server.URL, - 10*time.Millisecond, - clientmodel.LabelSet{"dings": "bums"}, - ).(*target) + testTarget := newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{"dings": "bums"}) testTarget.scrape(slowAppender{}) if testTarget.state != Unhealthy { @@ -105,9 +91,10 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { } func TestTargetRecordScrapeHealth(t *testing.T) { - testTarget := NewTarget( - "http://example.url", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"}, - ).(*target) + jcfg := config.JobConfig{} + proto.SetDefaults(&jcfg.JobConfig) + + testTarget := newTestTarget("example.url", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"}) now := clientmodel.Now() appender := &collectResultAppender{} @@ -123,7 +110,7 @@ func TestTargetRecordScrapeHealth(t *testing.T) { expected := &clientmodel.Sample{ Metric: clientmodel.Metric{ clientmodel.MetricNameLabel: scrapeHealthMetricName, - InstanceLabel: "example.url:80", + clientmodel.InstanceLabel: "example.url:80", clientmodel.JobLabel: "testjob", }, Timestamp: now, @@ -138,7 +125,7 @@ func TestTargetRecordScrapeHealth(t *testing.T) { expected = &clientmodel.Sample{ Metric: clientmodel.Metric{ clientmodel.MetricNameLabel: scrapeDurationMetricName, - InstanceLabel: "example.url:80", + clientmodel.InstanceLabel: "example.url:80", clientmodel.JobLabel: "testjob", }, Timestamp: now, @@ -163,7 +150,11 @@ func TestTargetScrapeTimeout(t *testing.T) { ) defer server.Close() - testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) + jcfg := config.JobConfig{} + proto.SetDefaults(&jcfg.JobConfig) + + var testTarget Target = newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) + appender := nopAppender{} // scrape once without timeout @@ -205,25 +196,20 @@ func TestTargetScrape404(t *testing.T) { ) defer server.Close() - testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) + testTarget := newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) appender := nopAppender{} want := errors.New("server returned HTTP status 404 Not Found") - got := testTarget.(*target).scrape(appender) + got := testTarget.scrape(appender) if got == nil || want.Error() != got.Error() { t.Fatalf("want err %q, got %q", want, got) } } func TestTargetRunScraperScrapes(t *testing.T) { - testTarget := target{ - state: Unknown, - url: "bad schema", - httpClient: utility.NewDeadlineClient(0), - scraperStopping: make(chan struct{}), - scraperStopped: make(chan struct{}), - } - go testTarget.RunScraper(nopAppender{}, time.Duration(time.Millisecond)) + testTarget := newTestTarget("bad schema", 0, nil) + + go testTarget.RunScraper(nopAppender{}) // Enough time for a scrape to happen. time.Sleep(2 * time.Millisecond) @@ -253,11 +239,7 @@ func BenchmarkScrape(b *testing.B) { ) defer server.Close() - testTarget := NewTarget( - server.URL, - 100*time.Millisecond, - clientmodel.LabelSet{"dings": "bums"}, - ) + var testTarget Target = newTestTarget(server.URL, 100*time.Millisecond, clientmodel.LabelSet{"dings": "bums"}) appender := nopAppender{} b.ResetTimer() @@ -267,3 +249,25 @@ func BenchmarkScrape(b *testing.B) { } } } + +func newTestTarget(targetURL string, deadline time.Duration, baseLabels clientmodel.LabelSet) *target { + t := &target{ + url: &url.URL{ + Scheme: "http", + Host: strings.TrimLeft(targetURL, "http://"), + Path: "/metrics", + }, + deadline: deadline, + scrapeInterval: 1 * time.Millisecond, + httpClient: utility.NewDeadlineClient(deadline), + scraperStopping: make(chan struct{}), + scraperStopped: make(chan struct{}), + } + t.baseLabels = clientmodel.LabelSet{ + clientmodel.InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier()), + } + for baseLabel, baseValue := range baseLabels { + t.baseLabels[baseLabel] = baseValue + } + return t +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 32f64947ac..2a3e3b325c 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -14,6 +14,8 @@ package retrieval import ( + "fmt" + "strings" "sync" "github.com/golang/glog" @@ -21,132 +23,385 @@ import ( clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/config" + pb "github.com/prometheus/prometheus/config/generated" + "github.com/prometheus/prometheus/retrieval/discovery" "github.com/prometheus/prometheus/storage" ) -// TargetManager manages all scrape targets. All methods are goroutine-safe. -type TargetManager interface { - AddTarget(job config.JobConfig, t Target) - ReplaceTargets(job config.JobConfig, newTargets []Target) - Remove(t Target) - AddTargetsFromConfig(config config.Config) +// A TargetProvider provides information about target groups. It maintains a set +// of sources from which TargetGroups can originate. Whenever a target provider +// detects a potential change it sends the TargetGroup through its provided channel. +// +// The TargetProvider does not have to guarantee that an actual change happened. +// It does guarantee that it sends the new TargetGroup whenever a change happens. +// On startup it sends all TargetGroups it can see. +type TargetProvider interface { + // Sources returns the source identifiers the provider is currently aware of. + Sources() []string + // Run hands a channel to the target provider through which it can send + // updated target groups. The channel must be closed by the target provider + // if no more updates will be sent. + Run(chan<- *config.TargetGroup) + // Stop terminates any potential computation of the target provider. The + // channel received on Run must be closed afterwards. Stop() - Pools() map[string]*TargetPool // Returns a copy of the name -> TargetPool mapping. } -type targetManager struct { - sync.Mutex // Protects poolByJob. +// TargetManager maintains a set of targets, starts and stops their scraping and +// creates the new targets based on the target groups it receives from various +// target providers. +type TargetManager struct { + m sync.RWMutex globalLabels clientmodel.LabelSet sampleAppender storage.SampleAppender - poolsByJob map[string]*TargetPool + running bool + + // Targets by their source ID. + targets map[string][]Target + // Providers and configs by their job name. + // TODO(fabxc): turn this into map[*ScrapeConfig][]TargetProvider eventually. + providers map[string][]TargetProvider + configs map[string]config.JobConfig } -// NewTargetManager returns a newly initialized TargetManager ready to use. -func NewTargetManager(sampleAppender storage.SampleAppender, globalLabels clientmodel.LabelSet) TargetManager { - return &targetManager{ +// NewTargetManager creates a new TargetManager based on the given config. +func NewTargetManager(cfg config.Config, sampleAppender storage.SampleAppender) (*TargetManager, error) { + tm := &TargetManager{ sampleAppender: sampleAppender, - globalLabels: globalLabels, - poolsByJob: make(map[string]*TargetPool), + targets: make(map[string][]Target), } + if err := tm.applyConfig(cfg); err != nil { + return nil, err + } + return tm, nil } -func (m *targetManager) targetPoolForJob(job config.JobConfig) *TargetPool { - targetPool, ok := m.poolsByJob[job.GetName()] +// Run starts background processing to handle target updates. +func (tm *TargetManager) Run() { + glog.Info("Starting target manager...") - if !ok { - var provider TargetProvider - if job.SdName != nil { - provider = NewSdTargetProvider(job, m.globalLabels) + sources := map[string]struct{}{} + + for name, provs := range tm.providers { + for _, p := range provs { + jcfg := tm.configs[name] + + ch := make(chan *config.TargetGroup) + go tm.handleTargetUpdates(tm.configs[name], ch) + + for _, src := range p.Sources() { + src = fullSource(jcfg, src) + sources[src] = struct{}{} + } + + // Run the target provider after cleanup of the stale targets is done. + defer func(c chan *config.TargetGroup) { + go p.Run(c) + }(ch) } - - interval := job.ScrapeInterval() - targetPool = NewTargetPool(provider, m.sampleAppender, interval) - glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName()) - - m.poolsByJob[job.GetName()] = targetPool - go targetPool.Run() } - return targetPool -} - -func (m *targetManager) AddTarget(job config.JobConfig, t Target) { - m.Lock() - defer m.Unlock() - - targetPool := m.targetPoolForJob(job) - targetPool.AddTarget(t) - m.poolsByJob[job.GetName()] = targetPool -} - -func (m *targetManager) ReplaceTargets(job config.JobConfig, newTargets []Target) { - m.Lock() - defer m.Unlock() - - targetPool := m.targetPoolForJob(job) - targetPool.ReplaceTargets(newTargets) -} - -func (m *targetManager) Remove(t Target) { - panic("not implemented") -} - -func (m *targetManager) AddTargetsFromConfig(config config.Config) { - for _, job := range config.Jobs() { - if job.SdName != nil { - m.Lock() - m.targetPoolForJob(job) - m.Unlock() - continue + tm.removeTargets(func(src string) bool { + if _, ok := sources[src]; ok { + return false } + return true + }) - for _, targetGroup := range job.TargetGroup { - baseLabels := clientmodel.LabelSet{ - clientmodel.JobLabel: clientmodel.LabelValue(job.GetName()), - } - for n, v := range m.globalLabels { - baseLabels[n] = v - } - if targetGroup.Labels != nil { - for _, label := range targetGroup.Labels.Label { - baseLabels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue()) - } - } + tm.running = true +} - for _, endpoint := range targetGroup.Target { - target := NewTarget(endpoint, job.ScrapeTimeout(), baseLabels) - m.AddTarget(job, target) - } +// handleTargetUpdates receives target group updates and handles them in the +// context of the given job config. +func (tm *TargetManager) handleTargetUpdates(cfg config.JobConfig, ch <-chan *config.TargetGroup) { + for tg := range ch { + glog.V(1).Infof("Received potential update for target group %q", tg.Source) + + if err := tm.updateTargetGroup(tg, cfg); err != nil { + glog.Errorf("Error updating targets: %s", err) } } } -func (m *targetManager) Stop() { - m.Lock() - defer m.Unlock() +// fullSource prepends the unique job name to the source. +// +// Thus, oscilliating label sets for targets with the same source, +// but providers from different configs, are prevented. +func fullSource(cfg config.JobConfig, src string) string { + return cfg.GetName() + ":" + src +} + +// Stop all background processing. +func (tm *TargetManager) Stop() { + tm.stop(true) +} + +// stop background processing of the target manager. If removeTargets is true, +// existing targets will be stopped and removed. +func (tm *TargetManager) stop(removeTargets bool) { + tm.m.Lock() + defer tm.m.Unlock() + + if !tm.running { + return + } glog.Info("Stopping target manager...") - var wg sync.WaitGroup - for j, p := range m.poolsByJob { - wg.Add(1) - go func(j string, p *TargetPool) { - defer wg.Done() - glog.Infof("Stopping target pool %q...", j) + defer glog.Info("Target manager stopped.") + + for _, provs := range tm.providers { + for _, p := range provs { p.Stop() - glog.Infof("Target pool %q stopped.", j) - }(j, p) + } + } + + if removeTargets { + tm.removeTargets(nil) + } + + tm.running = false +} + +// removeTargets stops and removes targets for sources where f(source) is true +// or if f is nil. This method is not thread-safe. +func (tm *TargetManager) removeTargets(f func(string) bool) { + if f == nil { + f = func(string) bool { return true } + } + var wg sync.WaitGroup + for src, targets := range tm.targets { + if !f(src) { + continue + } + wg.Add(len(targets)) + for _, target := range targets { + go func(t Target) { + t.StopScraper() + wg.Done() + }(target) + } + delete(tm.targets, src) } wg.Wait() - glog.Info("Target manager stopped.") } -func (m *targetManager) Pools() map[string]*TargetPool { - m.Lock() - defer m.Unlock() - - result := make(map[string]*TargetPool, len(m.poolsByJob)) - for k, v := range m.poolsByJob { - result[k] = v +// updateTargetGroup creates new targets for the group and replaces the old targets +// for the source ID. +func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg config.JobConfig) error { + newTargets, err := tm.targetsFromGroup(tgroup, cfg) + if err != nil { + return err } - return result + src := fullSource(cfg, tgroup.Source) + + tm.m.Lock() + defer tm.m.Unlock() + + oldTargets, ok := tm.targets[src] + if ok { + var wg sync.WaitGroup + // Replace the old targets with the new ones while keeping the state + // of intersecting targets. + for i, tnew := range newTargets { + var match Target + for j, told := range oldTargets { + if told == nil { + continue + } + if tnew.InstanceIdentifier() == told.InstanceIdentifier() { + match = told + oldTargets[j] = nil + break + } + } + // Update the exisiting target and discard the new equivalent. + // Otherwise start scraping the new target. + if match != nil { + // Updating is blocked during a scrape. We don't want those wait times + // to build up. + wg.Add(1) + go func(t Target) { + match.Update(cfg, t.BaseLabels()) + wg.Done() + }(tnew) + newTargets[i] = match + } else { + go tnew.RunScraper(tm.sampleAppender) + } + } + // Remove all old targets that disappeared. + for _, told := range oldTargets { + if told != nil { + wg.Add(1) + go func(t Target) { + t.StopScraper() + wg.Done() + }(told) + } + } + wg.Wait() + } else { + // The source ID is new, start all target scrapers. + for _, tnew := range newTargets { + go tnew.RunScraper(tm.sampleAppender) + } + } + + if len(newTargets) > 0 { + tm.targets[src] = newTargets + } else { + delete(tm.targets, src) + } + return nil +} + +// Pools returns the targets currently being scraped bucketed by their job name. +func (tm *TargetManager) Pools() map[string][]Target { + tm.m.RLock() + defer tm.m.RUnlock() + + pools := map[string][]Target{} + + for _, ts := range tm.targets { + for _, t := range ts { + job := string(t.BaseLabels()[clientmodel.JobLabel]) + pools[job] = append(pools[job], t) + } + } + return pools +} + +// ApplyConfig resets the manager's target providers and job configurations as defined +// by the new cfg. The state of targets that are valid in the new configuration remains unchanged. +func (tm *TargetManager) ApplyConfig(cfg config.Config) error { + tm.stop(false) + // Even if updating the config failed, we want to continue rather than stop scraping anything. + defer tm.Run() + + if err := tm.applyConfig(cfg); err != nil { + glog.Warningf("Error updating config, changes not applied: %s", err) + return err + } + return nil +} + +func (tm *TargetManager) applyConfig(cfg config.Config) error { + // Only apply changes if everything was successful. + providers := map[string][]TargetProvider{} + configs := map[string]config.JobConfig{} + + for _, jcfg := range cfg.Jobs() { + provs, err := ProvidersFromConfig(jcfg) + if err != nil { + return err + } + configs[jcfg.GetName()] = jcfg + providers[jcfg.GetName()] = provs + } + tm.m.Lock() + defer tm.m.Unlock() + + tm.globalLabels = cfg.GlobalLabels() + tm.providers = providers + tm.configs = configs + return nil +} + +// targetsFromGroup builds targets based on the given TargetGroup and config. +func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg config.JobConfig) ([]Target, error) { + tm.m.RLock() + defer tm.m.RUnlock() + + targets := make([]Target, 0, len(tg.Targets)) + for i, labels := range tg.Targets { + for ln, lv := range tg.Labels { + if _, ok := labels[ln]; !ok { + labels[ln] = lv + } + } + for ln, lv := range tm.globalLabels { + if _, ok := labels[ln]; !ok { + labels[ln] = lv + } + } + address, ok := labels[clientmodel.AddressLabel] + if !ok { + return nil, fmt.Errorf("Instance %d in target group %s has no address", i, tg) + } + if _, ok := labels[clientmodel.JobLabel]; !ok { + labels[clientmodel.JobLabel] = clientmodel.LabelValue(cfg.GetName()) + } + + for ln := range labels { + // There are currently no internal labels we want to take over to time series. + if strings.HasPrefix(string(ln), clientmodel.ReservedLabelPrefix) { + delete(labels, ln) + } + } + targets = append(targets, NewTarget(string(address), cfg, labels)) + } + return targets, nil +} + +// ProvidersFromConfig returns all TargetProviders configured in cfg. +func ProvidersFromConfig(cfg config.JobConfig) ([]TargetProvider, error) { + var providers []TargetProvider + + if name := cfg.GetSdName(); name != "" { + dnsSD := discovery.NewDNSDiscovery(name, cfg.SDRefreshInterval()) + providers = append(providers, dnsSD) + } + + if tgs := cfg.GetTargetGroup(); tgs != nil { + static := NewStaticProvider(tgs) + providers = append(providers, static) + } + return providers, nil +} + +// StaticProvider holds a list of target groups that never change. +type StaticProvider struct { + TargetGroups []*config.TargetGroup +} + +// NewStaticProvider returns a StaticProvider configured with the given +// target groups. +func NewStaticProvider(groups []*pb.TargetGroup) *StaticProvider { + prov := &StaticProvider{} + + for i, tg := range groups { + g := &config.TargetGroup{ + Source: fmt.Sprintf("static:%d", i), + Labels: clientmodel.LabelSet{}, + } + for _, pair := range tg.GetLabels().GetLabel() { + g.Labels[clientmodel.LabelName(pair.GetName())] = clientmodel.LabelValue(pair.GetValue()) + } + for _, t := range tg.GetTarget() { + g.Targets = append(g.Targets, clientmodel.LabelSet{ + clientmodel.AddressLabel: clientmodel.LabelValue(t), + }) + } + prov.TargetGroups = append(prov.TargetGroups, g) + } + return prov +} + +// Run implements the TargetProvider interface. +func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup) { + for _, tg := range sd.TargetGroups { + ch <- tg + } + close(ch) // This provider never sends any updates. +} + +// Stop implements the TargetProvider interface. +func (sd *StaticProvider) Stop() {} + +// TargetGroups returns the provider's target groups. +func (sd *StaticProvider) Sources() (srcs []string) { + for _, tg := range sd.TargetGroups { + srcs = append(srcs, tg.Source) + } + return srcs } diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 981fa3a3b5..ed1b1cd003 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -14,6 +14,7 @@ package retrieval import ( + "reflect" "testing" "time" @@ -21,110 +22,247 @@ import ( clientmodel "github.com/prometheus/client_golang/model" - pb "github.com/prometheus/prometheus/config/generated" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/config" + pb "github.com/prometheus/prometheus/config/generated" ) -type fakeTarget struct { - scrapeCount int - lastScrape time.Time - interval time.Duration -} - -func (t fakeTarget) LastError() error { - return nil -} - -func (t fakeTarget) URL() string { - return "fake" -} - -func (t fakeTarget) InstanceIdentifier() string { - return "fake" -} - -func (t fakeTarget) GlobalURL() string { - return t.URL() -} - -func (t fakeTarget) BaseLabels() clientmodel.LabelSet { - return clientmodel.LabelSet{} -} - -func (t fakeTarget) BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet { - return clientmodel.LabelSet{} -} - -func (t fakeTarget) Interval() time.Duration { - return t.interval -} - -func (t fakeTarget) LastScrape() time.Time { - return t.lastScrape -} - -func (t fakeTarget) scrape(storage.SampleAppender) error { - t.scrapeCount++ - - return nil -} - -func (t fakeTarget) RunScraper(storage.SampleAppender, time.Duration) { - return -} - -func (t fakeTarget) StopScraper() { - return -} - -func (t fakeTarget) State() TargetState { - return Healthy -} - -func (t *fakeTarget) SetBaseLabelsFrom(newTarget Target) {} - -func (t *fakeTarget) Ingest(clientmodel.Samples) error { return nil } - -func testTargetManager(t testing.TB) { - targetManager := NewTargetManager(nopAppender{}, nil) - testJob1 := config.JobConfig{ - JobConfig: pb.JobConfig{ - Name: proto.String("test_job1"), - ScrapeInterval: proto.String("1m"), +func TestTargetManagerChan(t *testing.T) { + testJob1 := pb.JobConfig{ + Name: proto.String("test_job1"), + ScrapeInterval: proto.String("1m"), + TargetGroup: []*pb.TargetGroup{ + {Target: []string{"example.org:80", "example.com:80"}}, }, } - testJob2 := config.JobConfig{ - JobConfig: pb.JobConfig{ - Name: proto.String("test_job2"), - ScrapeInterval: proto.String("1m"), + prov1 := &fakeTargetProvider{ + sources: []string{"src1", "src2"}, + update: make(chan *config.TargetGroup), + } + + targetManager := &TargetManager{ + sampleAppender: nopAppender{}, + providers: map[string][]TargetProvider{ + *testJob1.Name: []TargetProvider{prov1}, + }, + configs: map[string]config.JobConfig{ + *testJob1.Name: config.JobConfig{testJob1}, + }, + targets: make(map[string][]Target), + } + go targetManager.Run() + defer targetManager.Stop() + + sequence := []struct { + tgroup *config.TargetGroup + expected map[string][]clientmodel.LabelSet + }{ + { + tgroup: &config.TargetGroup{ + Source: "src1", + Targets: []clientmodel.LabelSet{ + {clientmodel.AddressLabel: "test-1:1234"}, + {clientmodel.AddressLabel: "test-2:1234", "label": "set"}, + {clientmodel.AddressLabel: "test-3:1234"}, + }, + }, + expected: map[string][]clientmodel.LabelSet{ + "test_job1:src1": { + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"}, + }, + }, + }, { + tgroup: &config.TargetGroup{ + Source: "src2", + Targets: []clientmodel.LabelSet{ + {clientmodel.AddressLabel: "test-1:1235"}, + {clientmodel.AddressLabel: "test-2:1235"}, + {clientmodel.AddressLabel: "test-3:1235"}, + }, + Labels: clientmodel.LabelSet{"group": "label"}, + }, + expected: map[string][]clientmodel.LabelSet{ + "test_job1:src1": { + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"}, + }, + "test_job1:src2": { + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1235", "group": "label"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1235", "group": "label"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1235", "group": "label"}, + }, + }, + }, { + tgroup: &config.TargetGroup{ + Source: "src2", + Targets: []clientmodel.LabelSet{}, + }, + expected: map[string][]clientmodel.LabelSet{ + "test_job1:src1": { + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"}, + }, + }, + }, { + tgroup: &config.TargetGroup{ + Source: "src1", + Targets: []clientmodel.LabelSet{ + {clientmodel.AddressLabel: "test-1:1234", "added": "label"}, + {clientmodel.AddressLabel: "test-3:1234"}, + {clientmodel.AddressLabel: "test-4:1234", "fancy": "label"}, + }, + }, + expected: map[string][]clientmodel.LabelSet{ + "test_job1:src1": { + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234", "added": "label"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-4:1234", "fancy": "label"}, + }, + }, }, } - target1GroupA := &fakeTarget{ - interval: time.Minute, - } - target2GroupA := &fakeTarget{ - interval: time.Minute, - } + for i, step := range sequence { + prov1.update <- step.tgroup - targetManager.AddTarget(testJob1, target1GroupA) - targetManager.AddTarget(testJob1, target2GroupA) + <-time.After(1 * time.Millisecond) - target1GroupB := &fakeTarget{ - interval: time.Minute * 2, - } + if len(targetManager.targets) != len(step.expected) { + t.Fatalf("step %d: sources mismatch %v, %v", targetManager.targets, step.expected) + } - targetManager.AddTarget(testJob2, target1GroupB) -} - -func TestTargetManager(t *testing.T) { - testTargetManager(t) -} - -func BenchmarkTargetManager(b *testing.B) { - for i := 0; i < b.N; i++ { - testTargetManager(b) + for source, actTargets := range targetManager.targets { + expTargets, ok := step.expected[source] + if !ok { + t.Fatalf("step %d: unexpected source %q: %v", i, source, actTargets) + } + for _, expt := range expTargets { + found := false + for _, actt := range actTargets { + if reflect.DeepEqual(expt, actt.BaseLabels()) { + found = true + break + } + } + if !found { + t.Errorf("step %d: expected target %v not found in actual targets", i, expt) + } + } + } + } +} + +func TestTargetManagerConfigUpdate(t *testing.T) { + testJob1 := &pb.JobConfig{ + Name: proto.String("test_job1"), + ScrapeInterval: proto.String("1m"), + TargetGroup: []*pb.TargetGroup{ + {Target: []string{"example.org:80", "example.com:80"}}, + }, + } + testJob2 := &pb.JobConfig{ + Name: proto.String("test_job2"), + ScrapeInterval: proto.String("1m"), + TargetGroup: []*pb.TargetGroup{ + {Target: []string{"example.org:8080", "example.com:8081"}}, + {Target: []string{"test.com:1234"}}, + }, + } + + sequence := []struct { + jobConfigs []*pb.JobConfig + expected map[string][]clientmodel.LabelSet + }{ + { + jobConfigs: []*pb.JobConfig{testJob1}, + expected: map[string][]clientmodel.LabelSet{ + "test_job1:static:0": { + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80"}, + }, + }, + }, { + jobConfigs: []*pb.JobConfig{testJob1}, + expected: map[string][]clientmodel.LabelSet{ + "test_job1:static:0": { + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80"}, + }, + }, + }, { + jobConfigs: []*pb.JobConfig{testJob1, testJob2}, + expected: map[string][]clientmodel.LabelSet{ + "test_job1:static:0": { + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80"}, + {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80"}, + }, + "test_job2:static:0": { + {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.org:8080"}, + {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.com:8081"}, + }, + "test_job2:static:1": { + {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "test.com:1234"}, + }, + }, + }, { + jobConfigs: []*pb.JobConfig{}, + expected: map[string][]clientmodel.LabelSet{}, + }, { + jobConfigs: []*pb.JobConfig{testJob2}, + expected: map[string][]clientmodel.LabelSet{ + "test_job2:static:0": { + {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.org:8080"}, + {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.com:8081"}, + }, + "test_job2:static:1": { + {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "test.com:1234"}, + }, + }, + }, + } + + targetManager, err := NewTargetManager(config.Config{}, nopAppender{}) + if err != nil { + t.Fatal(err) + } + targetManager.Run() + defer targetManager.Stop() + + for i, step := range sequence { + cfg := pb.PrometheusConfig{ + Job: step.jobConfigs, + } + err := targetManager.ApplyConfig(config.Config{cfg}) + if err != nil { + t.Fatal(err) + } + + <-time.After(1 * time.Millisecond) + + if len(targetManager.targets) != len(step.expected) { + t.Fatalf("step %d: sources mismatch %v, %v", targetManager.targets, step.expected) + } + + for source, actTargets := range targetManager.targets { + expTargets, ok := step.expected[source] + if !ok { + t.Fatalf("step %d: unexpected source %q: %v", i, source, actTargets) + } + for _, expt := range expTargets { + found := false + for _, actt := range actTargets { + if reflect.DeepEqual(expt, actt.BaseLabels()) { + found = true + break + } + } + if !found { + t.Errorf("step %d: expected target %v for %q not found in actual targets", i, expt, source) + } + } + } } } diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go deleted file mode 100644 index 9b1b005b65..0000000000 --- a/retrieval/targetpool.go +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package retrieval - -import ( - "sort" - "sync" - "time" - - "github.com/golang/glog" - - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/utility" -) - -const ( - targetAddQueueSize = 100 - targetReplaceQueueSize = 1 -) - -// TargetPool is a pool of targets for the same job. -type TargetPool struct { - sync.RWMutex - - manager TargetManager - targetsByURL map[string]Target - interval time.Duration - sampleAppender storage.SampleAppender - addTargetQueue chan Target - - targetProvider TargetProvider - - stopping, stopped chan struct{} -} - -// NewTargetPool creates a TargetPool, ready to be started by calling Run. -func NewTargetPool(p TargetProvider, app storage.SampleAppender, i time.Duration) *TargetPool { - return &TargetPool{ - interval: i, - sampleAppender: app, - targetsByURL: make(map[string]Target), - addTargetQueue: make(chan Target, targetAddQueueSize), - targetProvider: p, - stopping: make(chan struct{}), - stopped: make(chan struct{}), - } -} - -// Run starts the target pool. It returns when the target pool has stopped -// (after calling Stop). Run is usually called as a goroutine. -func (p *TargetPool) Run() { - ticker := time.NewTicker(p.interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if p.targetProvider != nil { - targets, err := p.targetProvider.Targets() - if err != nil { - glog.Warningf("Error looking up targets, keeping old list: %s", err) - } else { - p.ReplaceTargets(targets) - } - } - case newTarget := <-p.addTargetQueue: - p.addTarget(newTarget) - case <-p.stopping: - p.ReplaceTargets([]Target{}) - close(p.stopped) - return - } - } -} - -// Stop stops the target pool and returns once the shutdown is complete. -func (p *TargetPool) Stop() { - close(p.stopping) - <-p.stopped -} - -// AddTarget adds a target by queuing it in the target queue. -func (p *TargetPool) AddTarget(target Target) { - p.addTargetQueue <- target -} - -func (p *TargetPool) addTarget(target Target) { - p.Lock() - defer p.Unlock() - - p.targetsByURL[target.URL()] = target - go target.RunScraper(p.sampleAppender, p.interval) -} - -// ReplaceTargets replaces the old targets by the provided new ones but reuses -// old targets that are also present in newTargets to preserve scheduling and -// health state. Targets no longer present are stopped. -func (p *TargetPool) ReplaceTargets(newTargets []Target) { - p.Lock() - defer p.Unlock() - - newTargetURLs := make(utility.Set) - for _, newTarget := range newTargets { - newTargetURLs.Add(newTarget.URL()) - oldTarget, ok := p.targetsByURL[newTarget.URL()] - if ok { - oldTarget.SetBaseLabelsFrom(newTarget) - } else { - p.targetsByURL[newTarget.URL()] = newTarget - go newTarget.RunScraper(p.sampleAppender, p.interval) - } - } - - var wg sync.WaitGroup - for k, oldTarget := range p.targetsByURL { - if !newTargetURLs.Has(k) { - wg.Add(1) - go func(k string, oldTarget Target) { - defer wg.Done() - glog.V(1).Infof("Stopping scraper for target %s...", k) - oldTarget.StopScraper() - glog.V(1).Infof("Scraper for target %s stopped.", k) - }(k, oldTarget) - delete(p.targetsByURL, k) - } - } - wg.Wait() -} - -type targetsByURL []Target - -func (s targetsByURL) Len() int { - return len(s) -} -func (s targetsByURL) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} -func (s targetsByURL) Less(i, j int) bool { - return s[i].URL() < s[j].URL() -} - -// Targets returns a sorted copy of the current target list. -func (p *TargetPool) Targets() []Target { - p.RLock() - defer p.RUnlock() - - targets := make(targetsByURL, 0, len(p.targetsByURL)) - for _, v := range p.targetsByURL { - targets = append(targets, v) - } - sort.Sort(targets) - return targets -} diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go deleted file mode 100644 index 56e9ea09bf..0000000000 --- a/retrieval/targetpool_test.go +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package retrieval - -import ( - "net/http" - "testing" - "time" -) - -func testTargetPool(t testing.TB) { - type expectation struct { - size int - } - - type input struct { - url string - scheduledFor time.Time - } - - type output struct { - url string - } - - var scenarios = []struct { - name string - inputs []input - outputs []output - }{ - { - name: "empty", - inputs: []input{}, - outputs: []output{}, - }, - { - name: "single element", - inputs: []input{ - { - url: "single1", - }, - }, - outputs: []output{ - { - url: "single1", - }, - }, - }, - { - name: "plural schedules", - inputs: []input{ - { - url: "plural1", - }, - { - url: "plural2", - }, - }, - outputs: []output{ - { - url: "plural1", - }, - { - url: "plural2", - }, - }, - }, - } - - for i, scenario := range scenarios { - pool := NewTargetPool(nil, nopAppender{}, time.Duration(1)) - - for _, input := range scenario.inputs { - target := target{ - url: input.url, - httpClient: &http.Client{}, - } - pool.addTarget(&target) - } - - if len(pool.targetsByURL) != len(scenario.outputs) { - t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), len(pool.targetsByURL)) - } else { - for j, output := range scenario.outputs { - if target, ok := pool.targetsByURL[output.url]; !ok { - t.Errorf("%s %d.%d. expected Target url to be %s but was %s", scenario.name, i, j, output.url, target.URL()) - } - } - - if len(pool.targetsByURL) != len(scenario.outputs) { - t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), len(pool.targetsByURL)) - } - } - } -} - -func TestTargetPool(t *testing.T) { - testTargetPool(t) -} - -func TestTargetPoolReplaceTargets(t *testing.T) { - pool := NewTargetPool(nil, nopAppender{}, time.Duration(1)) - oldTarget1 := &target{ - url: "example1", - state: Unhealthy, - scraperStopping: make(chan struct{}), - scraperStopped: make(chan struct{}), - httpClient: &http.Client{}, - } - oldTarget2 := &target{ - url: "example2", - state: Unhealthy, - scraperStopping: make(chan struct{}), - scraperStopped: make(chan struct{}), - httpClient: &http.Client{}, - } - newTarget1 := &target{ - url: "example1", - state: Healthy, - scraperStopping: make(chan struct{}), - scraperStopped: make(chan struct{}), - httpClient: &http.Client{}, - } - newTarget2 := &target{ - url: "example3", - state: Healthy, - scraperStopping: make(chan struct{}), - scraperStopped: make(chan struct{}), - httpClient: &http.Client{}, - } - - pool.addTarget(oldTarget1) - pool.addTarget(oldTarget2) - - pool.ReplaceTargets([]Target{newTarget1, newTarget2}) - - if len(pool.targetsByURL) != 2 { - t.Errorf("Expected 2 elements in pool, had %d", len(pool.targetsByURL)) - } - - if pool.targetsByURL["example1"].State() != oldTarget1.State() { - t.Errorf("target1 channel has changed") - } - if pool.targetsByURL["example3"].State() == oldTarget2.State() { - t.Errorf("newTarget2 channel same as oldTarget2's") - } - -} - -func BenchmarkTargetPool(b *testing.B) { - for i := 0; i < b.N; i++ { - testTargetPool(b) - } -} diff --git a/web/status.go b/web/status.go index c641c9aacb..9de2fab987 100644 --- a/web/status.go +++ b/web/status.go @@ -30,7 +30,7 @@ type PrometheusStatusHandler struct { Config string Flags map[string]string RuleManager manager.RuleManager - TargetPools map[string]*retrieval.TargetPool + TargetPools func() map[string][]retrieval.Target Birth time.Time PathPrefix string diff --git a/web/templates/status.html b/web/templates/status.html index 91332c6016..c6e2de4726 100644 --- a/web/templates/status.html +++ b/web/templates/status.html @@ -33,7 +33,7 @@
{{$job}} | ||||
---|---|---|---|---|
{{.URL}} |