replace state machine with mutex

This commit is contained in:
Krasi Georgiev 2018-01-17 11:46:17 +00:00
parent a3de70ed19
commit af58c1b452

View file

@ -15,6 +15,7 @@ package retrieval
import ( import (
"fmt" "fmt"
"sync"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
@ -35,7 +36,6 @@ func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager {
return &ScrapeManager{ return &ScrapeManager{
append: app, append: app,
logger: logger, logger: logger,
actionCh: make(chan func()),
scrapeConfigs: make(map[string]*config.ScrapeConfig), scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool), scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}), graceShut: make(chan struct{}),
@ -49,7 +49,7 @@ type ScrapeManager struct {
append Appendable append Appendable
scrapeConfigs map[string]*config.ScrapeConfig scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool scrapePools map[string]*scrapePool
actionCh chan func() mtx sync.RWMutex
graceShut chan struct{} graceShut chan struct{}
} }
@ -59,8 +59,6 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*targetgroup.Group) error
for { for {
select { select {
case f := <-m.actionCh:
f()
case ts := <-tsets: case ts := <-tsets:
m.reload(ts) m.reload(ts)
case <-m.graceShut: case <-m.graceShut:
@ -79,23 +77,21 @@ func (m *ScrapeManager) Stop() {
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. // ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error {
done := make(chan struct{}) m.mtx.Lock()
m.actionCh <- func() { defer m.mtx.Unlock()
c := make(map[string]*config.ScrapeConfig) c := make(map[string]*config.ScrapeConfig)
for _, scfg := range cfg.ScrapeConfigs { for _, scfg := range cfg.ScrapeConfigs {
c[scfg.JobName] = scfg c[scfg.JobName] = scfg
} }
m.scrapeConfigs = c m.scrapeConfigs = c
close(done)
}
<-done
return nil return nil
} }
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. // TargetMap returns map of active and dropped targets and their corresponding scrape config job name.
func (m *ScrapeManager) TargetMap() map[string][]*Target { func (m *ScrapeManager) TargetMap() map[string][]*Target {
targetsMap := make(chan map[string][]*Target) m.mtx.Lock()
m.actionCh <- func() { defer m.mtx.Unlock()
targets := make(map[string][]*Target) targets := make(map[string][]*Target)
for jobName, sp := range m.scrapePools { for jobName, sp := range m.scrapePools {
sp.mtx.RLock() sp.mtx.RLock()
@ -105,26 +101,25 @@ func (m *ScrapeManager) TargetMap() map[string][]*Target {
targets[jobName] = append(targets[jobName], sp.droppedTargets...) targets[jobName] = append(targets[jobName], sp.droppedTargets...)
sp.mtx.RUnlock() sp.mtx.RUnlock()
} }
targetsMap <- targets
} return targets
return <-targetsMap
} }
// Targets returns the targets currently being scraped. // Targets returns the targets currently being scraped.
func (m *ScrapeManager) Targets() []*Target { func (m *ScrapeManager) Targets() []*Target {
targets := make(chan []*Target) m.mtx.Lock()
m.actionCh <- func() { defer m.mtx.Unlock()
var t []*Target
var targets []*Target
for _, p := range m.scrapePools { for _, p := range m.scrapePools {
p.mtx.RLock() p.mtx.RLock()
for _, tt := range p.targets { for _, tt := range p.targets {
t = append(t, tt) targets = append(targets, tt)
} }
p.mtx.RUnlock() p.mtx.RUnlock()
} }
targets <- t
} return targets
return <-targets
} }
func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) { func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) {