Simplify and fix TargetManager reloading

This commit is contained in:
Fabian Reinartz 2016-02-22 12:42:11 +01:00
parent da99366f85
commit cebba3efbb
2 changed files with 65 additions and 65 deletions

View file

@ -167,7 +167,6 @@ type Target struct {
scraperStopping chan struct{} scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped. // Closing scraperStopped signals that scraping has been stopped.
scraperStopped chan struct{} scraperStopped chan struct{}
running bool
// Mutex protects the members below. // Mutex protects the members below.
sync.RWMutex sync.RWMutex
@ -388,11 +387,9 @@ func (t *Target) InstanceIdentifier() string {
// RunScraper implements Target. // RunScraper implements Target.
func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
defer close(t.scraperStopped) log.Debugf("Running scraper for %v", t)
t.Lock() defer close(t.scraperStopped)
t.running = true
t.Unlock()
lastScrapeInterval := t.interval() lastScrapeInterval := t.interval()
@ -452,14 +449,6 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
// StopScraper implements Target. // StopScraper implements Target.
func (t *Target) StopScraper() { func (t *Target) StopScraper() {
t.Lock()
if !t.running {
t.Unlock()
return
}
t.running = false
t.Unlock()
log.Debugf("Stopping scraper for target %v...", t) log.Debugf("Stopping scraper for target %v...", t)
close(t.scraperStopping) close(t.scraperStopping)

View file

@ -73,38 +73,9 @@ func (tm *TargetManager) Run() {
log.Info("Starting target manager...") log.Info("Starting target manager...")
tm.mtx.Lock() tm.mtx.Lock()
tm.ctx, tm.cancel = context.WithCancel(context.Background()) tm.ctx, tm.cancel = context.WithCancel(context.Background())
tm.reload()
jobs := map[string]struct{}{}
// Start new target sets and update existing ones.
for _, scfg := range tm.scrapeConfigs {
jobs[scfg.JobName] = struct{}{}
ts, ok := tm.targetSets[scfg.JobName]
if !ok {
ts = newTargetSet(scfg, tm.appender)
tm.targetSets[scfg.JobName] = ts
}
ts.runProviders(tm.ctx, providersFromConfig(scfg))
}
// Stop old target sets.
for name := range tm.targetSets {
if _, ok := jobs[name]; !ok {
delete(tm.targetSets, name)
}
}
// Run target sets.
for _, ts := range tm.targetSets {
tm.wg.Add(1)
go func(ts *targetSet) {
ts.run(tm.ctx)
tm.wg.Done()
}(ts)
}
tm.mtx.Unlock() tm.mtx.Unlock()
@ -128,6 +99,38 @@ func (tm *TargetManager) Stop() {
log.Debugln("Target manager stopped") log.Debugln("Target manager stopped")
} }
func (tm *TargetManager) reload() {
jobs := map[string]struct{}{}
// Start new target sets and update existing ones.
for _, scfg := range tm.scrapeConfigs {
jobs[scfg.JobName] = struct{}{}
ts, ok := tm.targetSets[scfg.JobName]
if !ok {
ts = newTargetSet(scfg, tm.appender)
tm.targetSets[scfg.JobName] = ts
tm.wg.Add(1)
go func(ts *targetSet) {
ts.runScraping(tm.ctx)
tm.wg.Done()
}(ts)
}
ts.runProviders(tm.ctx, providersFromConfig(scfg))
}
// Remove old target sets. Waiting for stopping is already guaranteed
// by the goroutine that started the target set.
for name, ts := range tm.targetSets {
if _, ok := jobs[name]; !ok {
ts.cancel()
delete(tm.targetSets, name)
}
}
}
// Pools returns the targets currently being scraped bucketed by their job name. // Pools returns the targets currently being scraped bucketed by their job name.
func (tm *TargetManager) Pools() map[string][]*Target { func (tm *TargetManager) Pools() map[string][]*Target {
tm.mtx.RLock() tm.mtx.RLock()
@ -151,21 +154,14 @@ func (tm *TargetManager) Pools() map[string][]*Target {
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged. // by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
// Returns true on success. // Returns true on success.
func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
tm.mtx.RLock()
running := tm.ctx != nil
tm.mtx.RUnlock()
if running {
tm.Stop()
defer func() {
go tm.Run()
}()
}
tm.mtx.Lock() tm.mtx.Lock()
tm.scrapeConfigs = cfg.ScrapeConfigs defer tm.mtx.Unlock()
tm.mtx.Unlock()
tm.scrapeConfigs = cfg.ScrapeConfigs
if tm.ctx != nil {
tm.reload()
}
return true return true
} }
@ -180,8 +176,9 @@ type targetSet struct {
scrapePool *scrapePool scrapePool *scrapePool
config *config.ScrapeConfig config *config.ScrapeConfig
stopProviders func()
syncCh chan struct{} syncCh chan struct{}
cancelScraping func()
cancelProviders func()
} }
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
@ -194,7 +191,21 @@ func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetS
return ts return ts
} }
func (ts *targetSet) run(ctx context.Context) { func (ts *targetSet) cancel() {
ts.mtx.RLock()
defer ts.mtx.RUnlock()
if ts.cancelScraping != nil {
ts.cancelScraping()
}
if ts.cancelProviders != nil {
ts.cancelProviders()
}
}
func (ts *targetSet) runScraping(ctx context.Context) {
ctx, ts.cancelScraping = context.WithCancel(ctx)
ts.scrapePool.ctx = ctx ts.scrapePool.ctx = ctx
Loop: Loop:
@ -234,10 +245,10 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ
var wg sync.WaitGroup var wg sync.WaitGroup
if ts.stopProviders != nil { if ts.cancelProviders != nil {
ts.stopProviders() ts.cancelProviders()
} }
ctx, ts.stopProviders = context.WithCancel(ctx) ctx, ts.cancelProviders = context.WithCancel(ctx)
for name, prov := range providers { for name, prov := range providers {
wg.Add(1) wg.Add(1)
@ -373,7 +384,7 @@ func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) {
go tnew.RunScraper(sp.appender) go tnew.RunScraper(sp.appender)
} }
} }
for fp, told := range targets { for fp, told := range prevTargets {
// A previous target is no longer in the group. // A previous target is no longer in the group.
if _, ok := targets[fp]; !ok { if _, ok := targets[fp]; !ok {
wg.Add(1) wg.Add(1)
@ -388,7 +399,7 @@ func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) {
// Stop scrapers for target groups that disappeared completely. // Stop scrapers for target groups that disappeared completely.
for source, targets := range sp.tgroups { for source, targets := range sp.tgroups {
if _, ok := tgroups[source]; !ok { if _, ok := tgroups[source]; ok {
continue continue
} }
for _, told := range targets { for _, told := range targets {