Deduplicate targets in scrape pool.

With this commit the scrape pool deduplicates incoming
targets before scraping them. This way multiple target providers
can produce the same target but it will be scraped only once.
This commit is contained in:
Fabian Reinartz 2016-02-23 14:37:25 +01:00
parent 84f74b9a84
commit 76a8c6160d
2 changed files with 85 additions and 80 deletions

View file

@ -72,64 +72,72 @@ type scrapePool struct {
appender storage.SampleAppender appender storage.SampleAppender
config *config.ScrapeConfig config *config.ScrapeConfig
ctx context.Context ctx context.Context
mtx sync.RWMutex
tgroups map[string]map[model.Fingerprint]*Target
targets map[model.Fingerprint]loop mtx sync.RWMutex
targets map[model.Fingerprint]*Target
loops map[model.Fingerprint]loop
} }
func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
return &scrapePool{ return &scrapePool{
appender: app, appender: app,
config: cfg, config: cfg,
tgroups: map[string]map[model.Fingerprint]*Target{}, targets: map[model.Fingerprint]*Target{},
loops: map[model.Fingerprint]loop{},
} }
} }
// stop terminates all scrape loops and returns after they all terminated.
// A stopped scrape pool must not be used again.
func (sp *scrapePool) stop() { func (sp *scrapePool) stop() {
var wg sync.WaitGroup var wg sync.WaitGroup
sp.mtx.RLock() sp.mtx.RLock()
for _, tgroup := range sp.tgroups { for _, l := range sp.loops {
for _, t := range tgroup { wg.Add(1)
wg.Add(1)
go func(t *Target) { go func(l loop) {
t.scrapeLoop.stop() l.stop()
wg.Done() wg.Done()
}(t) }(l)
}
} }
sp.mtx.RUnlock() sp.mtx.RUnlock()
wg.Wait() wg.Wait()
} }
// reload the scrape pool with the given scrape configuration. The target state is preserved
// but all scrape loops are restarted with the new scrape configuration.
// This method returns after all scrape loops that were stopped have fully terminated.
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
log.Debugln("reload scrapepool")
defer log.Debugln("reload done")
sp.mtx.Lock() sp.mtx.Lock()
defer sp.mtx.Unlock() defer sp.mtx.Unlock()
sp.config = cfg sp.config = cfg
var wg sync.WaitGroup var (
wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
)
for _, tgroup := range sp.tgroups { for fp, oldLoop := range sp.loops {
for _, t := range tgroup { var (
wg.Add(1) t = sp.targets[fp]
newLoop = newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t))
)
wg.Add(1)
go func(t *Target) { go func(oldLoop, newLoop loop) {
t.scrapeLoop.stop() oldLoop.stop()
wg.Done()
t.scrapeLoop = newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) go newLoop.run(interval, timeout, nil)
go t.scrapeLoop.run(time.Duration(cfg.ScrapeInterval), time.Duration(cfg.ScrapeTimeout), nil) }(oldLoop, newLoop)
wg.Done()
}(t) sp.loops[fp] = newLoop
}
} }
wg.Wait() wg.Wait()
@ -169,64 +177,49 @@ func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender {
} }
} }
func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { // sync takes a list of potentially duplicated targets, deduplicates them, starts
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
sp.mtx.Lock() sp.mtx.Lock()
defer sp.mtx.Unlock() defer sp.mtx.Unlock()
var ( var (
wg sync.WaitGroup fingerprints = map[model.Fingerprint]struct{}{}
newTgroups = map[string]map[model.Fingerprint]*Target{} interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
) )
for source, targets := range tgroups { for _, t := range targets {
var ( fp := t.fingerprint()
prevTargets = sp.tgroups[source] fingerprints[fp] = struct{}{}
newTargets = map[model.Fingerprint]*Target{}
)
newTgroups[source] = newTargets
for fp, tnew := range targets { if _, ok := sp.targets[fp]; !ok {
// If the same target existed before, we let it run and replace l := newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t))
// the new one with it.
if told, ok := prevTargets[fp]; ok {
newTargets[fp] = told
} else {
newTargets[fp] = tnew
tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, sp.sampleAppender(tnew), sp.reportAppender(tnew)) sp.targets[fp] = t
go tnew.scrapeLoop.run(time.Duration(sp.config.ScrapeInterval), time.Duration(sp.config.ScrapeTimeout), nil) sp.loops[fp] = l
}
}
for fp, told := range prevTargets {
// A previous target is no longer in the group.
if _, ok := targets[fp]; !ok {
wg.Add(1)
go func(told *Target) { go l.run(interval, timeout, nil)
told.scrapeLoop.stop()
wg.Done()
}(told)
}
} }
} }
// Stop scrapers for target groups that disappeared completely. var wg sync.WaitGroup
for source, targets := range sp.tgroups {
if _, ok := tgroups[source]; ok { // Stop and remove old targets and scraper loops.
continue for fp := range sp.targets {
} if _, ok := fingerprints[fp]; !ok {
for _, told := range targets {
wg.Add(1) wg.Add(1)
go func(l loop) {
go func(told *Target) { l.stop()
told.scrapeLoop.stop()
wg.Done() wg.Done()
}(told) }(sp.loops[fp])
delete(sp.loops, fp)
delete(sp.targets, fp)
} }
} }
sp.tgroups = newTgroups
// Wait for all potentially stopped scrapers to terminate. // Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper // This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still // may be active and tries to insert. The old scraper that didn't terminate yet could still
@ -241,6 +234,7 @@ type scraper interface {
offset(interval time.Duration) time.Duration offset(interval time.Duration) time.Duration
} }
// A loop can run and be stopped again. It must be reused after it was stopped.
type loop interface { type loop interface {
run(interval, timeout time.Duration, errc chan<- error) run(interval, timeout time.Duration, errc chan<- error)
stop() stop()

View file

@ -142,12 +142,14 @@ func (tm *TargetManager) Pools() map[string][]*Target {
// TODO(fabxc): this is just a hack to maintain compatibility for now. // TODO(fabxc): this is just a hack to maintain compatibility for now.
for _, ps := range tm.targetSets { for _, ps := range tm.targetSets {
for _, ts := range ps.scrapePool.tgroups { ps.scrapePool.mtx.RLock()
for _, t := range ts {
job := string(t.Labels()[model.JobLabel]) for _, t := range ps.scrapePool.targets {
pools[job] = append(pools[job], t) job := string(t.Labels()[model.JobLabel])
} pools[job] = append(pools[job], t)
} }
ps.scrapePool.mtx.RUnlock()
} }
return pools return pools
} }
@ -168,10 +170,12 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
} }
// targetSet holds several TargetProviders for which the same scrape configuration // targetSet holds several TargetProviders for which the same scrape configuration
// is used. It runs the target providers and starts and stops scrapers as it // is used. It maintains target groups from all given providers and sync them
// receives target updates. // to a scrape pool.
type targetSet struct { type targetSet struct {
mtx sync.RWMutex mtx sync.RWMutex
// Sets of targets by a source string that is unique across target providers.
tgroups map[string]map[model.Fingerprint]*Target tgroups map[string]map[model.Fingerprint]*Target
providers map[string]TargetProvider providers map[string]TargetProvider
@ -231,7 +235,9 @@ Loop:
case <-ctx.Done(): case <-ctx.Done():
break Loop break Loop
case <-ts.syncCh: case <-ts.syncCh:
ts.mtx.RLock()
ts.sync() ts.sync()
ts.mtx.RUnlock()
} }
} }
@ -241,9 +247,13 @@ Loop:
} }
func (ts *targetSet) sync() { func (ts *targetSet) sync() {
// TODO(fabxc): temporary simple version. For a deduplicating scrape pool we will targets := []*Target{}
// submit a list of all targets. for _, tgroup := range ts.tgroups {
ts.scrapePool.sync(ts.tgroups) for _, t := range tgroup {
targets = append(targets, t)
}
}
ts.scrapePool.sync(targets)
} }
func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) { func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) {
@ -308,8 +318,9 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ
go prov.Run(ctx, updates) go prov.Run(ctx, updates)
} }
// We wait for a full initial set of target groups before releasing the mutex
// to ensure the initial sync is complete and there are no races with subsequent updates.
wg.Wait() wg.Wait()
ts.sync() ts.sync()
} }