From d15adfc917b3d417980c6259c0c8f072f1f82082 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 19 Feb 2016 22:54:51 +0100 Subject: [PATCH] Preserve target state across reloads. This commit moves Scraper handling into a separate scrapePool type. TargetSets only manage TargetProvider lifecycles and sync the retrieved updates to the scrapePool. TargetProviders are now expected to send a full initial target set within 5 seconds. The scrapePools preserve target state across reloads and only drop targets after the initial set was synced. --- retrieval/discovery/dns.go | 2 + retrieval/target.go | 15 +- retrieval/targetmanager.go | 297 +++++++++++++++++++++++++++---------- 3 files changed, 231 insertions(+), 83 deletions(-) diff --git a/retrieval/discovery/dns.go b/retrieval/discovery/dns.go index 3c75f07fc..7c744f8c0 100644 --- a/retrieval/discovery/dns.go +++ b/retrieval/discovery/dns.go @@ -113,6 +113,7 @@ func (dd *DNSDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup func (dd *DNSDiscovery) refreshAll(ch chan<- []*config.TargetGroup) { var wg sync.WaitGroup + wg.Add(len(dd.names)) for _, name := range dd.names { go func(n string) { @@ -122,6 +123,7 @@ func (dd *DNSDiscovery) refreshAll(ch chan<- []*config.TargetGroup) { wg.Done() }(name) } + wg.Wait() } diff --git a/retrieval/target.go b/retrieval/target.go index bf00f7ca7..999b4a5dc 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -167,6 +167,7 @@ type Target struct { scraperStopping chan struct{} // Closing scraperStopped signals that scraping has been stopped. scraperStopped chan struct{} + running bool // Mutex protects the members below. sync.RWMutex @@ -411,9 +412,11 @@ func (t *Target) InstanceIdentifier() string { func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { defer close(t.scraperStopped) - lastScrapeInterval := t.interval() + t.Lock() + t.running = true + t.Unlock() - log.Debugf("Starting scraper for target %v...", t) + lastScrapeInterval := t.interval() select { case <-time.After(t.offset(lastScrapeInterval)): @@ -471,6 +474,14 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { // StopScraper implements Target. func (t *Target) StopScraper() { + t.Lock() + if !t.running { + t.Unlock() + return + } + t.running = false + t.Unlock() + log.Debugf("Stopping scraper for target %v...", t) close(t.scraperStopping) diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 7328c9e03..ceb880b0d 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -17,6 +17,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/prometheus/common/log" "github.com/prometheus/common/model" @@ -47,21 +48,23 @@ type TargetProvider interface { // creates the new targets based on the target groups it receives from various // target providers. type TargetManager struct { - appender storage.SampleAppender + appender storage.SampleAppender + scrapeConfigs []*config.ScrapeConfig mtx sync.RWMutex ctx context.Context cancel func() wg sync.WaitGroup - // Providers by the scrape configs they are derived from. - scrapeSets []*scrapeSet + // Set of unqiue targets by scrape configuration. + targetSets map[string]*targetSet } // NewTargetManager creates a new TargetManager. func NewTargetManager(app storage.SampleAppender) *TargetManager { return &TargetManager{ - appender: app, + appender: app, + targetSets: map[string]*targetSet{}, } } @@ -72,16 +75,39 @@ func (tm *TargetManager) Run() { tm.mtx.Lock() tm.ctx, tm.cancel = context.WithCancel(context.Background()) - for _, ss := range tm.scrapeSets { + jobs := map[string]struct{}{} + + // Start new target sets and update existing ones. + for _, scfg := range tm.scrapeConfigs { + jobs[scfg.JobName] = struct{}{} + + ts, ok := tm.targetSets[scfg.JobName] + if !ok { + ts = newTargetSet(scfg, tm.appender) + tm.targetSets[scfg.JobName] = ts + } + ts.runProviders(tm.ctx, providersFromConfig(scfg)) + } + + // Stop old target sets. + for name := range tm.targetSets { + if _, ok := jobs[name]; !ok { + delete(tm.targetSets, name) + } + } + + // Run target sets. + for _, ts := range tm.targetSets { tm.wg.Add(1) - go func(ss *scrapeSet) { - ss.run(tm.ctx) + go func(ts *targetSet) { + ts.run(tm.ctx) tm.wg.Done() - }(ss) + }(ts) } tm.mtx.Unlock() + tm.wg.Wait() } @@ -90,14 +116,16 @@ func (tm *TargetManager) Stop() { log.Infoln("Stopping target manager...") tm.mtx.Lock() - - // Cancel the base context, this will cause all in-flight scrapes to abort immmediately. + // Cancel the base context, this will cause all target providers to shut down + // and all in-flight scrapes to abort immmediately. // Started inserts will be finished before terminating. tm.cancel() tm.mtx.Unlock() - // Wait for all provider sets to terminate. + // Wait for all scrape inserts to complete. tm.wg.Wait() + + log.Debugln("Target manager stopped") } // Pools returns the targets currently being scraped bucketed by their job name. @@ -108,8 +136,8 @@ func (tm *TargetManager) Pools() map[string][]*Target { pools := map[string][]*Target{} // TODO(fabxc): this is just a hack to maintain compatibility for now. - for _, ps := range tm.scrapeSets { - for _, ts := range ps.tgroups { + for _, ps := range tm.targetSets { + for _, ts := range ps.scrapePool.tgroups { for _, t := range ts { job := string(t.Labels()[model.JobLabel]) pools[job] = append(pools[job], t) @@ -135,66 +163,120 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { } tm.mtx.Lock() - - tm.scrapeSets = tm.scrapeSets[:0] - - for _, scfg := range cfg.ScrapeConfigs { - tm.scrapeSets = append(tm.scrapeSets, newScrapeSet(tm.appender, scfg)) - } - + tm.scrapeConfigs = cfg.ScrapeConfigs tm.mtx.Unlock() return true } -// scrapeSet holds several TargetProviders for which the same scrape configuration +// targetSet holds several TargetProviders for which the same scrape configuration // is used. It runs the target providers and starts and stops scrapers as it // receives target updates. -type scrapeSet struct { - appender storage.SampleAppender +type targetSet struct { + mtx sync.RWMutex + tgroups map[string]map[model.Fingerprint]*Target + providers map[string]TargetProvider - config *config.ScrapeConfig - tgroups map[string]map[model.Fingerprint]*Target + scrapePool *scrapePool + config *config.ScrapeConfig - mtx sync.RWMutex + stopProviders func() + syncCh chan struct{} } -func newScrapeSet(app storage.SampleAppender, cfg *config.ScrapeConfig) *scrapeSet { - return &scrapeSet{ - appender: app, - config: cfg, - tgroups: map[string]map[model.Fingerprint]*Target{}, +func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { + ts := &targetSet{ + tgroups: map[string]map[model.Fingerprint]*Target{}, + scrapePool: newScrapePool(app), + syncCh: make(chan struct{}, 1), + config: cfg, } + return ts } -// run starts the target providers with the given context and consumes -// and handles their updates. If the context is done, it blocks until the -// target scrapers have terminated. -func (ss *scrapeSet) run(ctx context.Context) { - var ( - providers = providersFromConfig(ss.config) - wg sync.WaitGroup - ) +func (ts *targetSet) run(ctx context.Context) { + ts.scrapePool.ctx = ctx + +Loop: + for { + // Throttle syncing to once per five seconds. + select { + case <-ctx.Done(): + break Loop + case <-time.After(5 * time.Second): + } + + select { + case <-ctx.Done(): + break Loop + case <-ts.syncCh: + ts.sync() + } + } + + // We want to wait for all pending target scrapes to complete though to ensure there'll + // be no more storage writes after this point. + ts.scrapePool.stop() +} + +func (ts *targetSet) sync() { + // TODO(fabxc): temporary simple version. For a deduplicating scrape pool we will + // submit a list of all targets. + ts.scrapePool.sync(ts.tgroups) +} + +func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) { + // Lock for the entire time. This may mean up to 5 seconds until the full initial set + // is retrieved and applied. + // We could release earlier with some tweaks, but this is easier to reason about. + ts.mtx.Lock() + defer ts.mtx.Unlock() + + var wg sync.WaitGroup + + if ts.stopProviders != nil { + ts.stopProviders() + } + ctx, ts.stopProviders = context.WithCancel(ctx) for name, prov := range providers { - var ( - updates = make(chan []*config.TargetGroup) - ) - wg.Add(1) - // The update and stopping operations for the target provider handling are blocking. - // Thus the run method only returns if all background processing is complete. - go func(name string, prov TargetProvider) { - defer wg.Done() + updates := make(chan []*config.TargetGroup) + + go func(name string, prov TargetProvider) { + var initial []*config.TargetGroup + + select { + case <-ctx.Done(): + wg.Done() + return + case initial = <-updates: + // First set of all targets the provider knows. + case <-time.After(5 * time.Second): + // Initial set didn't arrive. Act as if it was empty + // and wait for updates later on. + } + + for _, tgroup := range initial { + targets, err := targetsFromGroup(tgroup, ts.config) + if err != nil { + log.With("target_group", tgroup).Errorf("Target update failed: %s", err) + continue + } + ts.tgroups[name+"/"+tgroup.Source] = targets + } + + wg.Done() + + // Start listening for further updates. for { select { case <-ctx.Done(): - ss.stopScrapers(name) return case tgs := <-updates: for _, tg := range tgs { - if err := ss.update(name, tg); err != nil { + if err := ts.update(name, tg); err != nil { log.With("target_group", tg).Errorf("Target update failed: %s", err) } } @@ -206,18 +288,52 @@ func (ss *scrapeSet) run(ctx context.Context) { } wg.Wait() + + ts.sync() } -// stopScrapers shuts down all active scrapers for a provider. -func (ss *scrapeSet) stopScrapers(name string) { +// update handles a target group update from a target provider identified by the name. +func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error { + targets, err := targetsFromGroup(tgroup, ts.config) + if err != nil { + return err + } + + ts.mtx.Lock() + defer ts.mtx.Unlock() + + ts.tgroups[name+"/"+tgroup.Source] = targets + + select { + case ts.syncCh <- struct{}{}: + default: + } + + return nil +} + +// scrapePool manages scrapes for sets of targets. +type scrapePool struct { + appender storage.SampleAppender + + ctx context.Context + mtx sync.RWMutex + tgroups map[string]map[model.Fingerprint]*Target +} + +func newScrapePool(app storage.SampleAppender) *scrapePool { + return &scrapePool{ + appender: app, + tgroups: map[string]map[model.Fingerprint]*Target{}, + } +} + +func (sp *scrapePool) stop() { var wg sync.WaitGroup - ss.mtx.RLock() - // TODO(fabxc): the prefixing is slightly hacky but this will be gone with subsequent changes. - for source, tgroup := range ss.tgroups { - if !strings.HasPrefix(source, name) { - continue - } + sp.mtx.RLock() + + for _, tgroup := range sp.tgroups { for _, t := range tgroup { wg.Add(1) @@ -227,41 +343,55 @@ func (ss *scrapeSet) stopScrapers(name string) { }(t) } } - ss.mtx.RUnlock() + sp.mtx.RUnlock() wg.Wait() } -// update handles a target group update from a target provider identified by the name. -func (ss *scrapeSet) update(name string, tgroup *config.TargetGroup) error { +func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { + sp.mtx.Lock() + var ( - source = name + "/" + tgroup.Source - prevTargets = ss.tgroups[source] + wg sync.WaitGroup + newTgroups = map[string]map[model.Fingerprint]*Target{} ) - targets, err := targetsFromGroup(tgroup, ss.config) - if err != nil { - return err - } + for source, targets := range tgroups { + var ( + prevTargets = sp.tgroups[source] + newTargets = map[model.Fingerprint]*Target{} + ) + newTgroups[source] = newTargets - ss.mtx.Lock() - ss.tgroups[source] = targets + for fp, tnew := range targets { + // If the same target existed before, we let it run and replace + // the new one with it. + if told, ok := prevTargets[fp]; ok { + newTargets[fp] = told + } else { + newTargets[fp] = tnew + go tnew.RunScraper(sp.appender) + } + } + for fp, told := range targets { + // A previous target is no longer in the group. + if _, ok := targets[fp]; !ok { + wg.Add(1) - for fp, tnew := range targets { - // If the same target existed before, we let it run and replace - // the new one with it. - if told, ok := prevTargets[fp]; ok { - targets[fp] = told - } else { - go tnew.RunScraper(ss.appender) + go func(told *Target) { + told.StopScraper() + wg.Done() + }(told) + } } } - ss.mtx.Unlock() - var wg sync.WaitGroup - for fp, told := range prevTargets { - // A previous target is no longer in the group. - if _, ok := targets[fp]; !ok { + // Stop scrapers for target groups that disappeared completely. + for source, targets := range sp.tgroups { + if _, ok := tgroups[source]; !ok { + continue + } + for _, told := range targets { wg.Add(1) go func(told *Target) { @@ -270,13 +400,17 @@ func (ss *scrapeSet) update(name string, tgroup *config.TargetGroup) error { }(told) } } + + sp.tgroups = newTgroups + // Wait for all potentially stopped scrapers to terminate. // This covers the case of flapping targets. If the server is under high load, a new scraper // may be active and tries to insert. The old scraper that didn't terminate yet could still // be inserting a previous sample set. wg.Wait() - return nil + // TODO(fabxc): maybe this can be released earlier with subsequent refactoring. + sp.mtx.Unlock() } // providersFromConfig returns all TargetProviders configured in cfg. @@ -331,6 +465,7 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { // targetsFromGroup builds targets based on the given TargetGroup and config. func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) (map[model.Fingerprint]*Target, error) { targets := make(map[model.Fingerprint]*Target, len(tg.Targets)) + for i, labels := range tg.Targets { for k, v := range cfg.Params { if len(v) > 0 {