mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Put RuleManager concurrency in hands of caller, fix races.
This commit is contained in:
parent
6c36beb764
commit
adb87816f4
1
main.go
1
main.go
|
@ -325,6 +325,7 @@ func main() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Error loading rule files: %v", err)
|
log.Fatalf("Error loading rule files: %v", err)
|
||||||
}
|
}
|
||||||
|
go ruleManager.Run()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := webService.ServeForever()
|
err := webService.ServeForever()
|
||||||
|
|
|
@ -29,10 +29,14 @@ type Result struct {
|
||||||
|
|
||||||
type RuleManager interface {
|
type RuleManager interface {
|
||||||
AddRulesFromConfig(config config.Config) error
|
AddRulesFromConfig(config config.Config) error
|
||||||
|
Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
type ruleManager struct {
|
type ruleManager struct {
|
||||||
|
// Protects the rules list.
|
||||||
|
sync.Mutex
|
||||||
rules []Rule
|
rules []Rule
|
||||||
|
|
||||||
results chan *Result
|
results chan *Result
|
||||||
done chan bool
|
done chan bool
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
|
@ -47,12 +51,10 @@ func NewRuleManager(results chan *Result, interval time.Duration, storage *metri
|
||||||
interval: interval,
|
interval: interval,
|
||||||
storage: storage,
|
storage: storage,
|
||||||
}
|
}
|
||||||
// BUG(julius): Extract this so that the caller manages concurrency.
|
|
||||||
go manager.run(results)
|
|
||||||
return manager
|
return manager
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *ruleManager) run(results chan *Result) {
|
func (m *ruleManager) Run() {
|
||||||
ticker := time.NewTicker(m.interval)
|
ticker := time.NewTicker(m.interval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
@ -60,7 +62,7 @@ func (m *ruleManager) run(results chan *Result) {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
m.runIteration(results)
|
m.runIteration(m.results)
|
||||||
evalDurations.Add(map[string]string{intervalKey: m.interval.String()}, float64(time.Since(start)/time.Millisecond))
|
evalDurations.Add(map[string]string{intervalKey: m.interval.String()}, float64(time.Since(start)/time.Millisecond))
|
||||||
case <-m.done:
|
case <-m.done:
|
||||||
log.Printf("RuleManager exiting...")
|
log.Printf("RuleManager exiting...")
|
||||||
|
@ -80,7 +82,12 @@ func (m *ruleManager) runIteration(results chan *Result) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
for _, rule := range m.rules {
|
m.Lock()
|
||||||
|
rules := make([]Rule, len(m.rules))
|
||||||
|
copy(rules, m.rules)
|
||||||
|
m.Unlock()
|
||||||
|
|
||||||
|
for _, rule := range rules {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
// BUG(julius): Look at fixing thundering herd.
|
// BUG(julius): Look at fixing thundering herd.
|
||||||
go func(rule Rule) {
|
go func(rule Rule) {
|
||||||
|
@ -104,7 +111,9 @@ func (m *ruleManager) AddRulesFromConfig(config config.Config) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
m.Lock()
|
||||||
m.rules = append(m.rules, newRules...)
|
m.rules = append(m.rules, newRules...)
|
||||||
|
m.Unlock()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue