diff --git a/retrieval/scrape.go b/retrieval/scrape.go new file mode 100644 index 0000000000..40d398dedd --- /dev/null +++ b/retrieval/scrape.go @@ -0,0 +1,77 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retrieval + +// import ( +// "sync" +// "time" + +// "github.com/prometheus/common/log" +// "github.com/prometheus/common/model" +// "golang.org/x/net/context" + +// "github.com/prometheus/prometheus/config" +// "github.com/prometheus/prometheus/storage" +// ) + +// type scraper interface { +// scrape(context.Context) error +// report(start time.Time, dur time.Duration, err error) error +// } + +// type scrapePool struct { +// mtx sync.RWMutex +// targets map[model.Fingerprint]*Target +// loops map[model.Fingerprint]loop + +// config *config.ScrapeConfig + +// newLoop func(context.Context) +// } + +// func newScrapePool(c *config.ScrapeConfig) *scrapePool { +// return &scrapePool{config: c} +// } + +// func (sp *scrapePool) sync(targets []*Target) { +// sp.mtx.Lock() +// defer sp.mtx.Unlock() + +// uniqueTargets := make(map[string]*Target{}, len(targets)) + +// for _, t := range targets { +// uniqueTargets[t.fingerprint()] = t +// } + +// sp.targets = uniqueTargets +// } + +// type scrapeLoop struct { +// scraper scraper +// mtx sync.RWMutex +// } + +// func newScrapeLoop(ctx context.Context) + +// func (sl *scrapeLoop) update() {} + +// func (sl *scrapeLoop) run(ctx context.Context) { +// var wg sync.WaitGroup + +// wg.Wait() +// } + +// func (sl *scrapeLoop) stop() { + +// } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 6f06438b49..e223cc332e 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/retrieval/discovery" @@ -33,12 +34,8 @@ import ( // The TargetProvider does not have to guarantee that an actual change happened. // It does guarantee that it sends the new TargetGroup whenever a change happens. // -// Sources() is guaranteed to be called exactly once before each call to Run(). -// On a call to Run() implementing types must send a valid target group for each of -// the sources they declared in the last call to Sources(). +// Providers must initially send all known target groups as soon as it can. type TargetProvider interface { - // Sources returns the source identifiers the provider is currently aware of. - Sources() []string // Run hands a channel to the target provider through which it can send // updated target groups. The channel must be closed by the target provider // if no more updates will be sent. @@ -50,268 +47,57 @@ type TargetProvider interface { // creates the new targets based on the target groups it receives from various // target providers. type TargetManager struct { - mtx sync.RWMutex - sampleAppender storage.SampleAppender - running bool - done chan struct{} + appender storage.SampleAppender + + mtx sync.RWMutex + ctx context.Context + cancel func() + wg sync.WaitGroup - // Targets by their source ID. - targets map[string][]*Target // Providers by the scrape configs they are derived from. - providers map[*config.ScrapeConfig][]TargetProvider + scrapeSets []*scrapeSet } // NewTargetManager creates a new TargetManager. -func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager { - tm := &TargetManager{ - sampleAppender: sampleAppender, - targets: map[string][]*Target{}, +func NewTargetManager(app storage.SampleAppender) *TargetManager { + return &TargetManager{ + appender: app, } - return tm -} - -// merge multiple target group channels into a single output channel. -func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGroupUpdate { - var wg sync.WaitGroup - out := make(chan targetGroupUpdate) - - // Start an output goroutine for each input channel in cs. output - // copies values from c to out until c or done is closed, then calls - // wg.Done. - redir := func(c <-chan targetGroupUpdate) { - defer wg.Done() - for n := range c { - select { - case out <- n: - case <-done: - return - } - } - } - - wg.Add(len(cs)) - for _, c := range cs { - go redir(c) - } - - // Close the out channel if all inbound channels are closed. - go func() { - wg.Wait() - close(out) - }() - return out -} - -// targetGroupUpdate is a potentially changed/new target group -// for the given scrape configuration. -type targetGroupUpdate struct { - tg config.TargetGroup - scfg *config.ScrapeConfig } // Run starts background processing to handle target updates. func (tm *TargetManager) Run() { log.Info("Starting target manager...") - tm.done = make(chan struct{}) - - sources := map[string]struct{}{} - updates := []<-chan targetGroupUpdate{} - - for scfg, provs := range tm.providers { - for _, prov := range provs { - // Get an initial set of available sources so we don't remove - // target groups from the last run that are still available. - for _, src := range prov.Sources() { - sources[src] = struct{}{} - } - - tgc := make(chan config.TargetGroup) - // Run the target provider after cleanup of the stale targets is done. - defer func(prov TargetProvider, tgc chan<- config.TargetGroup, done <-chan struct{}) { - go prov.Run(tgc, done) - }(prov, tgc, tm.done) - - tgupc := make(chan targetGroupUpdate) - updates = append(updates, tgupc) - - go func(scfg *config.ScrapeConfig, done <-chan struct{}) { - defer close(tgupc) - for { - select { - case tg := <-tgc: - tgupc <- targetGroupUpdate{tg: tg, scfg: scfg} - case <-done: - return - } - } - }(scfg, tm.done) - } - } - - // Merge all channels of incoming target group updates into a single - // one and keep applying the updates. - go tm.handleUpdates(merge(tm.done, updates...), tm.done) - tm.mtx.Lock() - defer tm.mtx.Unlock() + tm.ctx, tm.cancel = context.WithCancel(context.Background()) - // Remove old target groups that are no longer in the set of sources. - tm.removeTargets(func(src string) bool { - if _, ok := sources[src]; ok { - return false - } - return true - }) + for _, ss := range tm.scrapeSets { + tm.wg.Add(1) - tm.running = true - log.Info("Target manager started.") -} - -// handleUpdates receives target group updates and handles them in the -// context of the given job config. -func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan struct{}) { - for { - select { - case update, ok := <-ch: - if !ok { - return - } - log.Debugf("Received potential update for target group %q", update.tg.Source) - - if err := tm.updateTargetGroup(&update.tg, update.scfg); err != nil { - log.Errorf("Error updating targets: %s", err) - } - case <-done: - return - } + go func(ss *scrapeSet) { + ss.run(tm.ctx) + tm.wg.Done() + }(ss) } + + tm.mtx.Unlock() + tm.wg.Wait() } // Stop all background processing. func (tm *TargetManager) Stop() { - tm.mtx.RLock() - if tm.running { - defer tm.stop(true) - } - // Return the lock before calling tm.stop(). - defer tm.mtx.RUnlock() -} - -// stop background processing of the target manager. If removeTargets is true, -// existing targets will be stopped and removed. -func (tm *TargetManager) stop(removeTargets bool) { - log.Info("Stopping target manager...") - defer log.Info("Target manager stopped.") - - close(tm.done) + log.Infoln("Stopping target manager...") tm.mtx.Lock() - defer tm.mtx.Unlock() - if removeTargets { - tm.removeTargets(nil) - } + // Cancel the base context, this will cause all in-flight scrapes to abort immmediately. + // Started inserts will be finished before terminating. + tm.cancel() + tm.mtx.Unlock() - tm.running = false -} - -// removeTargets stops and removes targets for sources where f(source) is true -// or if f is nil. This method is not thread-safe. -func (tm *TargetManager) removeTargets(f func(string) bool) { - if f == nil { - f = func(string) bool { return true } - } - var wg sync.WaitGroup - for src, targets := range tm.targets { - if !f(src) { - continue - } - wg.Add(len(targets)) - for _, target := range targets { - go func(t *Target) { - t.StopScraper() - wg.Done() - }(target) - } - delete(tm.targets, src) - } - wg.Wait() -} - -// updateTargetGroup creates new targets for the group and replaces the old targets -// for the source ID. -func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *config.ScrapeConfig) error { - newTargets, err := tm.targetsFromGroup(tgroup, cfg) - if err != nil { - return err - } - - tm.mtx.Lock() - defer tm.mtx.Unlock() - - if !tm.running { - return nil - } - - oldTargets, ok := tm.targets[tgroup.Source] - if ok { - var wg sync.WaitGroup - // Replace the old targets with the new ones while keeping the state - // of intersecting targets. - for i, tnew := range newTargets { - var match *Target - for j, told := range oldTargets { - if told == nil { - continue - } - if tnew.fingerprint() == told.fingerprint() { - match = told - oldTargets[j] = nil - break - } - } - // Update the existing target and discard the new equivalent. - // Otherwise start scraping the new target. - if match != nil { - // Updating is blocked during a scrape. We don't want those wait times - // to build up. - wg.Add(1) - go func(t *Target) { - if err := match.Update(cfg, t.labels, t.metaLabels); err != nil { - log.Errorf("Error updating target %v: %v", t, err) - } - wg.Done() - }(tnew) - newTargets[i] = match - } else { - go tnew.RunScraper(tm.sampleAppender) - } - } - // Remove all old targets that disappeared. - for _, told := range oldTargets { - if told != nil { - wg.Add(1) - go func(t *Target) { - t.StopScraper() - wg.Done() - }(told) - } - } - wg.Wait() - } else { - // The source ID is new, start all target scrapers. - for _, tnew := range newTargets { - go tnew.RunScraper(tm.sampleAppender) - } - } - - if len(newTargets) > 0 { - tm.targets[tgroup.Source] = newTargets - } else { - delete(tm.targets, tgroup.Source) - } - return nil + // Wait for all provider sets to terminate. + tm.wg.Wait() } // Pools returns the targets currently being scraped bucketed by their job name. @@ -321,10 +107,13 @@ func (tm *TargetManager) Pools() map[string][]*Target { pools := map[string][]*Target{} - for _, ts := range tm.targets { - for _, t := range ts { - job := string(t.Labels()[model.JobLabel]) - pools[job] = append(pools[job], t) + // TODO(fabxc): this is just a hack to maintain compatibility for now. + for _, ps := range tm.scrapeSets { + for _, ts := range ps.tgroups { + for _, t := range ts { + job := string(t.Labels()[model.JobLabel]) + pools[job] = append(pools[job], t) + } } } return pools @@ -335,78 +124,173 @@ func (tm *TargetManager) Pools() map[string][]*Target { // Returns true on success. func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { tm.mtx.RLock() - running := tm.running + running := tm.ctx != nil tm.mtx.RUnlock() if running { - tm.stop(false) - // Even if updating the config failed, we want to continue rather than stop scraping anything. - defer tm.Run() - } - providers := map[*config.ScrapeConfig][]TargetProvider{} - - for _, scfg := range cfg.ScrapeConfigs { - providers[scfg] = providersFromConfig(scfg) + tm.Stop() + defer func() { + go tm.Run() + }() } tm.mtx.Lock() - defer tm.mtx.Unlock() - tm.providers = providers + tm.scrapeSets = tm.scrapeSets[:0] + + for _, scfg := range cfg.ScrapeConfigs { + tm.scrapeSets = append(tm.scrapeSets, newScrapeSet(tm.appender, scfg)) + } + + tm.mtx.Unlock() + return true } -// prefixedTargetProvider wraps TargetProvider and prefixes source strings -// to make the sources unique across a configuration. -type prefixedTargetProvider struct { - TargetProvider +// scrapeSet holds several TargetProviders for which the same scrape configuration +// is used. It runs the target providers and starts and stops scrapers as it +// receives target updates. +type scrapeSet struct { + appender storage.SampleAppender - job string - mechanism string - idx int + config *config.ScrapeConfig + tgroups map[string]map[model.Fingerprint]*Target + + mtx sync.RWMutex } -func (tp *prefixedTargetProvider) prefix(src string) string { - return fmt.Sprintf("%s:%s:%d:%s", tp.job, tp.mechanism, tp.idx, src) +func newScrapeSet(app storage.SampleAppender, cfg *config.ScrapeConfig) *scrapeSet { + return &scrapeSet{ + appender: app, + config: cfg, + tgroups: map[string]map[model.Fingerprint]*Target{}, + } } -func (tp *prefixedTargetProvider) Sources() []string { - srcs := tp.TargetProvider.Sources() - for i, src := range srcs { - srcs[i] = tp.prefix(src) +// run starts the target providers with the given context and consumes +// and handles their updates. If the context is done, it blocks until the +// target scrapers have terminated. +func (ss *scrapeSet) run(ctx context.Context) { + var ( + providers = providersFromConfig(ss.config) + wg sync.WaitGroup + ) + + for name, prov := range providers { + var ( + updates = make(chan config.TargetGroup) + ) + + wg.Add(1) + // The update and stopping operations for the target provider handling are blocking. + // Thus the run method only returns if all background processing is complete. + go func(name string, prov TargetProvider) { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + ss.stopScrapers(name) + return + case update := <-updates: + if err := ss.update(name, &update); err != nil { + log.With("target_group", update).Errorf("Target update failed: %s", err) + } + } + } + }(name, prov) + + done := make(chan struct{}) + + // TODO(fabxc): Adjust the TargetProvider interface so we can remove this + // redirection of the termination signal. + go func() { + <-ctx.Done() + close(done) + }() + go prov.Run(updates, done) } - return srcs + wg.Wait() } -func (tp *prefixedTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { - defer close(ch) +// stopScrapers shuts down all active scrapers for a provider. +func (ss *scrapeSet) stopScrapers(name string) { + var wg sync.WaitGroup - ch2 := make(chan config.TargetGroup) - go tp.TargetProvider.Run(ch2, done) + ss.mtx.RLock() + // TODO(fabxc): the prefixing is slightly hacky but this will be gone with subsequent changes. + for source, tgroup := range ss.tgroups { + if !strings.HasPrefix(source, name) { + continue + } + for _, t := range tgroup { + wg.Add(1) - for { - select { - case <-done: - return - case tg := <-ch2: - tg.Source = tp.prefix(tg.Source) - ch <- tg + go func(t *Target) { + t.StopScraper() + wg.Done() + }(t) } } + ss.mtx.RUnlock() + + wg.Wait() +} + +// update handles a target group update from a target provider identified by the name. +func (ss *scrapeSet) update(name string, tgroup *config.TargetGroup) error { + var ( + source = name + "/" + tgroup.Source + prevTargets = ss.tgroups[source] + ) + + targets, err := targetsFromGroup(tgroup, ss.config) + if err != nil { + return err + } + + ss.mtx.Lock() + ss.tgroups[source] = targets + + for fp, tnew := range targets { + // If the same target existed before, we let it run and replace + // the new one with it. + if told, ok := prevTargets[fp]; ok { + targets[fp] = told + } else { + go tnew.RunScraper(ss.appender) + } + } + ss.mtx.Unlock() + + var wg sync.WaitGroup + for fp, told := range prevTargets { + // A previous target is no longer in the group. + if _, ok := targets[fp]; !ok { + wg.Add(1) + + go func(told *Target) { + told.StopScraper() + wg.Done() + }(told) + } + } + // Wait for all potentially stopped scrapers to terminate. + // This covers the case of flapping targets. If the server is under high load, a new scraper + // may be active and tries to insert. The old scraper that didn't terminate yet could still + // be inserting a previous sample set. + wg.Wait() + + return nil } // providersFromConfig returns all TargetProviders configured in cfg. -func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { - var providers []TargetProvider +func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { + providers := map[string]TargetProvider{} app := func(mech string, i int, tp TargetProvider) { - providers = append(providers, &prefixedTargetProvider{ - job: cfg.JobName, - mechanism: mech, - idx: i, - TargetProvider: tp, - }) + providers[fmt.Sprintf("%s/%d", mech, i)] = tp } for i, c := range cfg.DNSSDConfigs { @@ -451,11 +335,8 @@ func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { } // targetsFromGroup builds targets based on the given TargetGroup and config. -func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { - tm.mtx.RLock() - defer tm.mtx.RUnlock() - - targets := make([]*Target, 0, len(tg.Targets)) +func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) (map[model.Fingerprint]*Target, error) { + targets := make(map[model.Fingerprint]*Target, len(tg.Targets)) for i, labels := range tg.Targets { for k, v := range cfg.Params { if len(v) > 0 { @@ -522,7 +403,8 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc if err != nil { return nil, fmt.Errorf("error while creating instance %d in target group %s: %s", i, tg, err) } - targets = append(targets, tr) + + targets[tr.fingerprint()] = tr } return targets, nil @@ -557,11 +439,3 @@ func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{} } <-done } - -// Sources returns the provider's sources. -func (sd *StaticProvider) Sources() (srcs []string) { - for _, tg := range sd.TargetGroups { - srcs = append(srcs, tg.Source) - } - return srcs -}