From 76a8c6160d4659f32d42a923d7f556ee69352f5a Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 23 Feb 2016 14:37:25 +0100 Subject: [PATCH] Deduplicate targets in scrape pool. With this commit the scrape pool deduplicates incoming targets before scraping them. This way multiple target providers can produce the same target but it will be scraped only once. --- retrieval/scrape.go | 130 ++++++++++++++++++------------------- retrieval/targetmanager.go | 35 ++++++---- 2 files changed, 85 insertions(+), 80 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 5586671e1..d23d38ff2 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -72,64 +72,72 @@ type scrapePool struct { appender storage.SampleAppender config *config.ScrapeConfig - ctx context.Context - mtx sync.RWMutex - tgroups map[string]map[model.Fingerprint]*Target + ctx context.Context - targets map[model.Fingerprint]loop + mtx sync.RWMutex + targets map[model.Fingerprint]*Target + loops map[model.Fingerprint]loop } func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { return &scrapePool{ appender: app, config: cfg, - tgroups: map[string]map[model.Fingerprint]*Target{}, + targets: map[model.Fingerprint]*Target{}, + loops: map[model.Fingerprint]loop{}, } } +// stop terminates all scrape loops and returns after they all terminated. +// A stopped scrape pool must not be used again. func (sp *scrapePool) stop() { var wg sync.WaitGroup sp.mtx.RLock() - for _, tgroup := range sp.tgroups { - for _, t := range tgroup { - wg.Add(1) + for _, l := range sp.loops { + wg.Add(1) - go func(t *Target) { - t.scrapeLoop.stop() - wg.Done() - }(t) - } + go func(l loop) { + l.stop() + wg.Done() + }(l) } sp.mtx.RUnlock() wg.Wait() } +// reload the scrape pool with the given scrape configuration. The target state is preserved +// but all scrape loops are restarted with the new scrape configuration. +// This method returns after all scrape loops that were stopped have fully terminated. func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { - log.Debugln("reload scrapepool") - defer log.Debugln("reload done") - sp.mtx.Lock() defer sp.mtx.Unlock() sp.config = cfg - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + interval = time.Duration(sp.config.ScrapeInterval) + timeout = time.Duration(sp.config.ScrapeTimeout) + ) - for _, tgroup := range sp.tgroups { - for _, t := range tgroup { - wg.Add(1) + for fp, oldLoop := range sp.loops { + var ( + t = sp.targets[fp] + newLoop = newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) + ) + wg.Add(1) - go func(t *Target) { - t.scrapeLoop.stop() + go func(oldLoop, newLoop loop) { + oldLoop.stop() + wg.Done() - t.scrapeLoop = newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) - go t.scrapeLoop.run(time.Duration(cfg.ScrapeInterval), time.Duration(cfg.ScrapeTimeout), nil) - wg.Done() - }(t) - } + go newLoop.run(interval, timeout, nil) + }(oldLoop, newLoop) + + sp.loops[fp] = newLoop } wg.Wait() @@ -169,64 +177,49 @@ func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { } } -func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { +// sync takes a list of potentially duplicated targets, deduplicates them, starts +// scrape loops for new targets, and stops scrape loops for disappeared targets. +// It returns after all stopped scrape loops terminated. +func (sp *scrapePool) sync(targets []*Target) { sp.mtx.Lock() defer sp.mtx.Unlock() var ( - wg sync.WaitGroup - newTgroups = map[string]map[model.Fingerprint]*Target{} + fingerprints = map[model.Fingerprint]struct{}{} + interval = time.Duration(sp.config.ScrapeInterval) + timeout = time.Duration(sp.config.ScrapeTimeout) ) - for source, targets := range tgroups { - var ( - prevTargets = sp.tgroups[source] - newTargets = map[model.Fingerprint]*Target{} - ) - newTgroups[source] = newTargets + for _, t := range targets { + fp := t.fingerprint() + fingerprints[fp] = struct{}{} - for fp, tnew := range targets { - // If the same target existed before, we let it run and replace - // the new one with it. - if told, ok := prevTargets[fp]; ok { - newTargets[fp] = told - } else { - newTargets[fp] = tnew + if _, ok := sp.targets[fp]; !ok { + l := newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) - tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, sp.sampleAppender(tnew), sp.reportAppender(tnew)) - go tnew.scrapeLoop.run(time.Duration(sp.config.ScrapeInterval), time.Duration(sp.config.ScrapeTimeout), nil) - } - } - for fp, told := range prevTargets { - // A previous target is no longer in the group. - if _, ok := targets[fp]; !ok { - wg.Add(1) + sp.targets[fp] = t + sp.loops[fp] = l - go func(told *Target) { - told.scrapeLoop.stop() - wg.Done() - }(told) - } + go l.run(interval, timeout, nil) } } - // Stop scrapers for target groups that disappeared completely. - for source, targets := range sp.tgroups { - if _, ok := tgroups[source]; ok { - continue - } - for _, told := range targets { + var wg sync.WaitGroup + + // Stop and remove old targets and scraper loops. + for fp := range sp.targets { + if _, ok := fingerprints[fp]; !ok { wg.Add(1) - - go func(told *Target) { - told.scrapeLoop.stop() + go func(l loop) { + l.stop() wg.Done() - }(told) + }(sp.loops[fp]) + + delete(sp.loops, fp) + delete(sp.targets, fp) } } - sp.tgroups = newTgroups - // Wait for all potentially stopped scrapers to terminate. // This covers the case of flapping targets. If the server is under high load, a new scraper // may be active and tries to insert. The old scraper that didn't terminate yet could still @@ -241,6 +234,7 @@ type scraper interface { offset(interval time.Duration) time.Duration } +// A loop can run and be stopped again. It must be reused after it was stopped. type loop interface { run(interval, timeout time.Duration, errc chan<- error) stop() diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 474dbaf86..d0f0960a1 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -142,12 +142,14 @@ func (tm *TargetManager) Pools() map[string][]*Target { // TODO(fabxc): this is just a hack to maintain compatibility for now. for _, ps := range tm.targetSets { - for _, ts := range ps.scrapePool.tgroups { - for _, t := range ts { - job := string(t.Labels()[model.JobLabel]) - pools[job] = append(pools[job], t) - } + ps.scrapePool.mtx.RLock() + + for _, t := range ps.scrapePool.targets { + job := string(t.Labels()[model.JobLabel]) + pools[job] = append(pools[job], t) } + + ps.scrapePool.mtx.RUnlock() } return pools } @@ -168,10 +170,12 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { } // targetSet holds several TargetProviders for which the same scrape configuration -// is used. It runs the target providers and starts and stops scrapers as it -// receives target updates. +// is used. It maintains target groups from all given providers and sync them +// to a scrape pool. type targetSet struct { - mtx sync.RWMutex + mtx sync.RWMutex + + // Sets of targets by a source string that is unique across target providers. tgroups map[string]map[model.Fingerprint]*Target providers map[string]TargetProvider @@ -231,7 +235,9 @@ Loop: case <-ctx.Done(): break Loop case <-ts.syncCh: + ts.mtx.RLock() ts.sync() + ts.mtx.RUnlock() } } @@ -241,9 +247,13 @@ Loop: } func (ts *targetSet) sync() { - // TODO(fabxc): temporary simple version. For a deduplicating scrape pool we will - // submit a list of all targets. - ts.scrapePool.sync(ts.tgroups) + targets := []*Target{} + for _, tgroup := range ts.tgroups { + for _, t := range tgroup { + targets = append(targets, t) + } + } + ts.scrapePool.sync(targets) } func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) { @@ -308,8 +318,9 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ go prov.Run(ctx, updates) } + // We wait for a full initial set of target groups before releasing the mutex + // to ensure the initial sync is complete and there are no races with subsequent updates. wg.Wait() - ts.sync() }