// Copyright 2013 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package retrieval import ( "fmt" "strings" "sync" "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/retrieval/discovery" "github.com/prometheus/prometheus/storage" ) // A TargetProvider provides information about target groups. It maintains a set // of sources from which TargetGroups can originate. Whenever a target provider // detects a potential change, it sends the TargetGroup through its provided channel. // // The TargetProvider does not have to guarantee that an actual change happened. // It does guarantee that it sends the new TargetGroup whenever a change happens. // // Sources() is guaranteed to be called exactly once before each call to Run(). // On a call to Run() implementing types must send a valid target group for each of // the sources they declared in the last call to Sources(). type TargetProvider interface { // Sources returns the source identifiers the provider is currently aware of. Sources() []string // Run hands a channel to the target provider through which it can send // updated target groups. The channel must be closed by the target provider // if no more updates will be sent. // On receiving from done Run must return. Run(up chan<- config.TargetGroup, done <-chan struct{}) } // TargetManager maintains a set of targets, starts and stops their scraping and // creates the new targets based on the target groups it receives from various // target providers. type TargetManager struct { mtx sync.RWMutex sampleAppender storage.SampleAppender running bool done chan struct{} // Targets by their source ID. targets map[string][]*Target // Providers by the scrape configs they are derived from. providers map[*config.ScrapeConfig][]TargetProvider } // NewTargetManager creates a new TargetManager. func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager { tm := &TargetManager{ sampleAppender: sampleAppender, targets: map[string][]*Target{}, } return tm } // merge multiple target group channels into a single output channel. func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGroupUpdate { var wg sync.WaitGroup out := make(chan targetGroupUpdate) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c or done is closed, then calls // wg.Done. redir := func(c <-chan targetGroupUpdate) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } wg.Add(len(cs)) for _, c := range cs { go redir(c) } // Close the out channel if all inbound channels are closed. go func() { wg.Wait() close(out) }() return out } // targetGroupUpdate is a potentially changed/new target group // for the given scrape configuration. type targetGroupUpdate struct { tg config.TargetGroup scfg *config.ScrapeConfig } // Run starts background processing to handle target updates. func (tm *TargetManager) Run() { log.Info("Starting target manager...") tm.done = make(chan struct{}) sources := map[string]struct{}{} updates := []<-chan targetGroupUpdate{} for scfg, provs := range tm.providers { for _, prov := range provs { // Get an initial set of available sources so we don't remove // target groups from the last run that are still available. for _, src := range prov.Sources() { sources[src] = struct{}{} } tgc := make(chan config.TargetGroup) // Run the target provider after cleanup of the stale targets is done. defer func(prov TargetProvider, tgc chan<- config.TargetGroup, done <-chan struct{}) { go prov.Run(tgc, done) }(prov, tgc, tm.done) tgupc := make(chan targetGroupUpdate) updates = append(updates, tgupc) go func(scfg *config.ScrapeConfig, done <-chan struct{}) { defer close(tgupc) for { select { case tg := <-tgc: tgupc <- targetGroupUpdate{tg: tg, scfg: scfg} case <-done: return } } }(scfg, tm.done) } } // Merge all channels of incoming target group updates into a single // one and keep applying the updates. go tm.handleUpdates(merge(tm.done, updates...), tm.done) tm.mtx.Lock() defer tm.mtx.Unlock() // Remove old target groups that are no longer in the set of sources. tm.removeTargets(func(src string) bool { if _, ok := sources[src]; ok { return false } return true }) tm.running = true } // handleUpdates receives target group updates and handles them in the // context of the given job config. func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan struct{}) { for { select { case update, ok := <-ch: if !ok { return } log.Debugf("Received potential update for target group %q", update.tg.Source) if err := tm.updateTargetGroup(&update.tg, update.scfg); err != nil { log.Errorf("Error updating targets: %s", err) } case <-done: return } } } // Stop all background processing. func (tm *TargetManager) Stop() { tm.mtx.RLock() if tm.running { defer tm.stop(true) } // Return the lock before calling tm.stop(). defer tm.mtx.RUnlock() } // stop background processing of the target manager. If removeTargets is true, // existing targets will be stopped and removed. func (tm *TargetManager) stop(removeTargets bool) { log.Info("Stopping target manager...") defer log.Info("Target manager stopped.") close(tm.done) tm.mtx.Lock() defer tm.mtx.Unlock() if removeTargets { tm.removeTargets(nil) } tm.running = false } // removeTargets stops and removes targets for sources where f(source) is true // or if f is nil. This method is not thread-safe. func (tm *TargetManager) removeTargets(f func(string) bool) { if f == nil { f = func(string) bool { return true } } var wg sync.WaitGroup for src, targets := range tm.targets { if !f(src) { continue } wg.Add(len(targets)) for _, target := range targets { go func(t *Target) { t.StopScraper() wg.Done() }(target) } delete(tm.targets, src) } wg.Wait() } // updateTargetGroup creates new targets for the group and replaces the old targets // for the source ID. func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *config.ScrapeConfig) error { newTargets, err := tm.targetsFromGroup(tgroup, cfg) if err != nil { return err } tm.mtx.Lock() defer tm.mtx.Unlock() if !tm.running { return nil } oldTargets, ok := tm.targets[tgroup.Source] if ok { var wg sync.WaitGroup // Replace the old targets with the new ones while keeping the state // of intersecting targets. for i, tnew := range newTargets { var match *Target for j, told := range oldTargets { if told == nil { continue } if tnew.InstanceIdentifier() == told.InstanceIdentifier() { match = told oldTargets[j] = nil break } } // Update the existing target and discard the new equivalent. // Otherwise start scraping the new target. if match != nil { // Updating is blocked during a scrape. We don't want those wait times // to build up. wg.Add(1) go func(t *Target) { match.Update(cfg, t.fullLabels(), t.metaLabels) wg.Done() }(tnew) newTargets[i] = match } else { go tnew.RunScraper(tm.sampleAppender) } } // Remove all old targets that disappeared. for _, told := range oldTargets { if told != nil { wg.Add(1) go func(t *Target) { t.StopScraper() wg.Done() }(told) } } wg.Wait() } else { // The source ID is new, start all target scrapers. for _, tnew := range newTargets { go tnew.RunScraper(tm.sampleAppender) } } if len(newTargets) > 0 { tm.targets[tgroup.Source] = newTargets } else { delete(tm.targets, tgroup.Source) } return nil } // Pools returns the targets currently being scraped bucketed by their job name. func (tm *TargetManager) Pools() map[string][]*Target { tm.mtx.RLock() defer tm.mtx.RUnlock() pools := map[string][]*Target{} for _, ts := range tm.targets { for _, t := range ts { job := string(t.BaseLabels()[model.JobLabel]) pools[job] = append(pools[job], t) } } return pools } // ApplyConfig resets the manager's target providers and job configurations as defined // by the new cfg. The state of targets that are valid in the new configuration remains unchanged. // Returns true on success. func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { tm.mtx.RLock() running := tm.running tm.mtx.RUnlock() if running { tm.stop(false) // Even if updating the config failed, we want to continue rather than stop scraping anything. defer tm.Run() } providers := map[*config.ScrapeConfig][]TargetProvider{} for _, scfg := range cfg.ScrapeConfigs { providers[scfg] = providersFromConfig(scfg) } tm.mtx.Lock() defer tm.mtx.Unlock() tm.providers = providers return true } // prefixedTargetProvider wraps TargetProvider and prefixes source strings // to make the sources unique across a configuration. type prefixedTargetProvider struct { TargetProvider job string mechanism string idx int } func (tp *prefixedTargetProvider) prefix(src string) string { return fmt.Sprintf("%s:%s:%d:%s", tp.job, tp.mechanism, tp.idx, src) } func (tp *prefixedTargetProvider) Sources() []string { srcs := tp.TargetProvider.Sources() for i, src := range srcs { srcs[i] = tp.prefix(src) } return srcs } func (tp *prefixedTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) ch2 := make(chan config.TargetGroup) go tp.TargetProvider.Run(ch2, done) for { select { case <-done: return case tg := <-ch2: tg.Source = tp.prefix(tg.Source) ch <- tg } } } // providersFromConfig returns all TargetProviders configured in cfg. func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { var providers []TargetProvider app := func(mech string, i int, tp TargetProvider) { providers = append(providers, &prefixedTargetProvider{ job: cfg.JobName, mechanism: mech, idx: i, TargetProvider: tp, }) } for i, c := range cfg.DNSSDConfigs { app("dns", i, discovery.NewDNSDiscovery(c)) } for i, c := range cfg.FileSDConfigs { app("file", i, discovery.NewFileDiscovery(c)) } for i, c := range cfg.ConsulSDConfigs { app("consul", i, discovery.NewConsulDiscovery(c)) } for i, c := range cfg.MarathonSDConfigs { app("marathon", i, discovery.NewMarathonDiscovery(c)) } for i, c := range cfg.KubernetesSDConfigs { k, err := discovery.NewKubernetesDiscovery(c) if err != nil { log.Errorf("Cannot create Kubernetes discovery: %s", err) continue } app("kubernetes", i, k) } for i, c := range cfg.ServersetSDConfigs { app("serverset", i, discovery.NewServersetDiscovery(c)) } for i, c := range cfg.EC2SDConfigs { app("ec2", i, discovery.NewEC2Discovery(c)) } if len(cfg.TargetGroups) > 0 { app("static", 0, NewStaticProvider(cfg.TargetGroups)) } return providers } // targetsFromGroup builds targets based on the given TargetGroup and config. func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { tm.mtx.RLock() defer tm.mtx.RUnlock() targets := make([]*Target, 0, len(tg.Targets)) for i, labels := range tg.Targets { addr := string(labels[model.AddressLabel]) // If no port was provided, infer it based on the used scheme. if !strings.Contains(addr, ":") { switch cfg.Scheme { case "http": addr = fmt.Sprintf("%s:80", addr) case "https": addr = fmt.Sprintf("%s:443", addr) default: panic(fmt.Errorf("targetsFromGroup: invalid scheme %q", cfg.Scheme)) } labels[model.AddressLabel] = model.LabelValue(addr) } for k, v := range cfg.Params { if len(v) > 0 { labels[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0]) } } // Copy labels into the labelset for the target if they are not // set already. Apply the labelsets in order of decreasing precedence. labelsets := []model.LabelSet{ tg.Labels, { model.SchemeLabel: model.LabelValue(cfg.Scheme), model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), model.JobLabel: model.LabelValue(cfg.JobName), }, } for _, lset := range labelsets { for ln, lv := range lset { if _, ok := labels[ln]; !ok { labels[ln] = lv } } } if _, ok := labels[model.AddressLabel]; !ok { return nil, fmt.Errorf("instance %d in target group %s has no address", i, tg) } preRelabelLabels := labels labels, err := Relabel(labels, cfg.RelabelConfigs...) if err != nil { return nil, fmt.Errorf("error while relabeling instance %d in target group %s: %s", i, tg, err) } // Check if the target was dropped. if labels == nil { continue } if err = config.CheckTargetAddress(labels[model.AddressLabel]); err != nil { return nil, err } for ln := range labels { // Meta labels are deleted after relabelling. Other internal labels propagate to // the target which decides whether they will be part of their label set. if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { delete(labels, ln) } } tr := NewTarget(cfg, labels, preRelabelLabels) targets = append(targets, tr) } return targets, nil } // StaticProvider holds a list of target groups that never change. type StaticProvider struct { TargetGroups []*config.TargetGroup } // NewStaticProvider returns a StaticProvider configured with the given // target groups. func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { for i, tg := range groups { tg.Source = fmt.Sprintf("%d", i) } return &StaticProvider{ TargetGroups: groups, } } // Run implements the TargetProvider interface. func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) for _, tg := range sd.TargetGroups { select { case <-done: return case ch <- *tg: } } <-done } // Sources returns the provider's sources. func (sd *StaticProvider) Sources() (srcs []string) { for _, tg := range sd.TargetGroups { srcs = append(srcs, tg.Source) } return srcs }