diff --git a/retrieval/manager.go b/retrieval/manager.go index 4f2f77c820..86522fd10f 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -15,6 +15,7 @@ package retrieval import ( "fmt" + "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -35,7 +36,6 @@ func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager { return &ScrapeManager{ append: app, logger: logger, - actionCh: make(chan func()), scrapeConfigs: make(map[string]*config.ScrapeConfig), scrapePools: make(map[string]*scrapePool), graceShut: make(chan struct{}), @@ -49,7 +49,7 @@ type ScrapeManager struct { append Appendable scrapeConfigs map[string]*config.ScrapeConfig scrapePools map[string]*scrapePool - actionCh chan func() + mtx sync.RWMutex graceShut chan struct{} } @@ -59,8 +59,6 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*targetgroup.Group) error for { select { - case f := <-m.actionCh: - f() case ts := <-tsets: m.reload(ts) case <-m.graceShut: @@ -79,52 +77,49 @@ func (m *ScrapeManager) Stop() { // ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { - done := make(chan struct{}) - m.actionCh <- func() { - c := make(map[string]*config.ScrapeConfig) - for _, scfg := range cfg.ScrapeConfigs { - c[scfg.JobName] = scfg - } - m.scrapeConfigs = c - close(done) + m.mtx.Lock() + defer m.mtx.Unlock() + c := make(map[string]*config.ScrapeConfig) + for _, scfg := range cfg.ScrapeConfigs { + c[scfg.JobName] = scfg } - <-done + m.scrapeConfigs = c return nil } // TargetMap returns map of active and dropped targets and their corresponding scrape config job name. func (m *ScrapeManager) TargetMap() map[string][]*Target { - targetsMap := make(chan map[string][]*Target) - m.actionCh <- func() { - targets := make(map[string][]*Target) - for jobName, sp := range m.scrapePools { - sp.mtx.RLock() - for _, t := range sp.targets { - targets[jobName] = append(targets[jobName], t) - } - targets[jobName] = append(targets[jobName], sp.droppedTargets...) - sp.mtx.RUnlock() + m.mtx.Lock() + defer m.mtx.Unlock() + + targets := make(map[string][]*Target) + for jobName, sp := range m.scrapePools { + sp.mtx.RLock() + for _, t := range sp.targets { + targets[jobName] = append(targets[jobName], t) } - targetsMap <- targets + targets[jobName] = append(targets[jobName], sp.droppedTargets...) + sp.mtx.RUnlock() } - return <-targetsMap + + return targets } // Targets returns the targets currently being scraped. func (m *ScrapeManager) Targets() []*Target { - targets := make(chan []*Target) - m.actionCh <- func() { - var t []*Target - for _, p := range m.scrapePools { - p.mtx.RLock() - for _, tt := range p.targets { - t = append(t, tt) - } - p.mtx.RUnlock() + m.mtx.Lock() + defer m.mtx.Unlock() + + var targets []*Target + for _, p := range m.scrapePools { + p.mtx.RLock() + for _, tt := range p.targets { + targets = append(targets, tt) } - targets <- t + p.mtx.RUnlock() } - return <-targets + + return targets } func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) {