Merge pull request #282 from prometheus/fix/race/targetpool

Fix race conditions in TargetPool.
This commit is contained in:
Matt T. Proud 2013-06-05 07:35:16 -07:00
commit c594d18f54
3 changed files with 74 additions and 51 deletions

View file

@ -262,3 +262,17 @@ func (t *target) Merge(newTarget Target) {
} }
t.baseLabels = newTarget.BaseLabels() t.baseLabels = newTarget.BaseLabels()
} }
type targets []Target
func (t targets) Len() int {
return len(t)
}
func (t targets) Less(i, j int) bool {
return t[i].scheduledFor().Before(t[j].scheduledFor())
}
func (t targets) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}

View file

@ -14,50 +14,46 @@
package retrieval package retrieval
import ( import (
"github.com/prometheus/prometheus/retrieval/format"
"log" "log"
"sort" "sort"
"sync"
"time" "time"
"github.com/prometheus/prometheus/retrieval/format"
) )
const ( const (
intervalKey = "interval" intervalKey = "interval"
targetAddQueueSize = 100
targetReplaceQueueSize = 1
) )
type TargetPool struct { type TargetPool struct {
sync.RWMutex
done chan bool done chan bool
manager TargetManager manager TargetManager
targets []Target targets targets
addTargetQueue chan Target addTargetQueue chan Target
replaceTargetsQueue chan []Target replaceTargetsQueue chan targets
} }
func NewTargetPool(m TargetManager) *TargetPool { func NewTargetPool(m TargetManager) *TargetPool {
return &TargetPool{ return &TargetPool{
manager: m, manager: m,
addTargetQueue: make(chan Target), addTargetQueue: make(chan Target, targetAddQueueSize),
replaceTargetsQueue: make(chan []Target), replaceTargetsQueue: make(chan targets, targetReplaceQueueSize),
} }
} }
func (p TargetPool) Len() int {
return len(p.targets)
}
func (p TargetPool) Less(i, j int) bool {
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
}
func (p TargetPool) Swap(i, j int) {
p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
}
func (p *TargetPool) Run(results chan format.Result, interval time.Duration) { func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
ticker := time.Tick(interval) ticker := time.NewTicker(interval)
defer ticker.Stop()
for { for {
select { select {
case <-ticker: case <-ticker.C:
p.runIteration(results, interval) p.runIteration(results, interval)
case newTarget := <-p.addTargetQueue: case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget) p.addTarget(newTarget)
@ -65,7 +61,7 @@ func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
p.replaceTargets(newTargets) p.replaceTargets(newTargets)
case <-p.done: case <-p.done:
log.Printf("TargetPool exiting...") log.Printf("TargetPool exiting...")
break return
} }
} }
} }
@ -79,14 +75,29 @@ func (p *TargetPool) AddTarget(target Target) {
} }
func (p *TargetPool) addTarget(target Target) { func (p *TargetPool) addTarget(target Target) {
p.Lock()
defer p.Unlock()
p.targets = append(p.targets, target) p.targets = append(p.targets, target)
} }
func (p *TargetPool) ReplaceTargets(newTargets []Target) { func (p *TargetPool) ReplaceTargets(newTargets []Target) {
p.Lock()
defer p.Unlock()
// If there is anything remaining in the queue for effectuation, clear it out,
// because the last mutation should win.
select {
case <-p.replaceTargetsQueue:
default:
p.replaceTargetsQueue <- newTargets p.replaceTargetsQueue <- newTargets
}
} }
func (p *TargetPool) replaceTargets(newTargets []Target) { func (p *TargetPool) replaceTargets(newTargets []Target) {
p.Lock()
defer p.Unlock()
// Replace old target list by new one, but reuse those targets from the old // Replace old target list by new one, but reuse those targets from the old
// list of targets which are also in the new list (to preserve scheduling and // list of targets which are also in the new list (to preserve scheduling and
// health state). // health state).
@ -98,6 +109,7 @@ func (p *TargetPool) replaceTargets(newTargets []Target) {
} }
} }
} }
p.targets = newTargets p.targets = newTargets
} }
@ -109,14 +121,15 @@ func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t
} }
func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) { func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) {
begin := time.Now() p.RLock()
defer p.RUnlock()
targetCount := p.Len() begin := time.Now()
finished := make(chan bool, targetCount) wait := sync.WaitGroup{}
// Sort p.targets by next scheduling time so we can process the earliest // Sort p.targets by next scheduling time so we can process the earliest
// targets first. // targets first.
sort.Sort(p) sort.Sort(p.targets)
for _, target := range p.targets { for _, target := range p.targets {
now := time.Now() now := time.Now()
@ -124,34 +137,29 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura
if target.scheduledFor().After(now) { if target.scheduledFor().After(now) {
// None of the remaining targets are ready to be scheduled. Signal that // None of the remaining targets are ready to be scheduled. Signal that
// we're done processing them in this scrape iteration. // we're done processing them in this scrape iteration.
finished <- true
continue continue
} }
wait.Add(1)
go func(t Target) { go func(t Target) {
p.runSingle(now, results, t) p.runSingle(now, results, t)
finished <- true wait.Done()
}(target) }(target)
} }
for i := 0; i < targetCount; { wait.Wait()
select {
case <-finished:
i++
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case newTargets := <-p.replaceTargetsQueue:
p.replaceTargets(newTargets)
}
}
close(finished)
duration := float64(time.Since(begin) / time.Millisecond) duration := float64(time.Since(begin) / time.Millisecond)
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration) retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
} }
// BUG(all): Not really thread-safe. Only used in /status page for now.
func (p *TargetPool) Targets() []Target { func (p *TargetPool) Targets() []Target {
return p.targets p.RLock()
defer p.RUnlock()
targets := make([]Target, len(p.targets))
copy(targets, p.targets)
return targets
} }

View file

@ -14,11 +14,12 @@
package retrieval package retrieval
import ( import (
"github.com/prometheus/prometheus/retrieval/format"
"github.com/prometheus/prometheus/utility/test"
"sort" "sort"
"testing" "testing"
"time" "time"
"github.com/prometheus/prometheus/retrieval/format"
"github.com/prometheus/prometheus/utility/test"
) )
func testTargetPool(t test.Tester) { func testTargetPool(t test.Tester) {
@ -113,10 +114,10 @@ func testTargetPool(t test.Tester) {
pool.addTarget(&target) pool.addTarget(&target)
} }
sort.Sort(pool) sort.Sort(pool.targets)
if pool.Len() != len(scenario.outputs) { if pool.targets.Len() != len(scenario.outputs) {
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len()) t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.targets.Len())
} else { } else {
for j, output := range scenario.outputs { for j, output := range scenario.outputs {
target := pool.targets[j] target := pool.targets[j]
@ -127,8 +128,8 @@ func testTargetPool(t test.Tester) {
} }
} }
if pool.Len() != len(scenario.outputs) { if pool.targets.Len() != len(scenario.outputs) {
t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), pool.Len()) t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), pool.targets.Len())
} }
} }
} }
@ -187,10 +188,10 @@ func TestTargetPoolReplaceTargets(t *testing.T) {
pool.addTarget(oldTarget2) pool.addTarget(oldTarget2)
pool.replaceTargets([]Target{newTarget1, newTarget2}) pool.replaceTargets([]Target{newTarget1, newTarget2})
sort.Sort(pool) sort.Sort(pool.targets)
if pool.Len() != 2 { if pool.targets.Len() != 2 {
t.Errorf("Expected 2 elements in pool, had %d", pool.Len()) t.Errorf("Expected 2 elements in pool, had %d", pool.targets.Len())
} }
target1 := pool.targets[0].(*target) target1 := pool.targets[0].(*target)