diff --git a/retrieval/target.go b/retrieval/target.go index bc52e41e7..b9604c2e9 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -262,3 +262,17 @@ func (t *target) Merge(newTarget Target) { } t.baseLabels = newTarget.BaseLabels() } + +type targets []Target + +func (t targets) Len() int { + return len(t) +} + +func (t targets) Less(i, j int) bool { + return t[i].scheduledFor().Before(t[j].scheduledFor()) +} + +func (t targets) Swap(i, j int) { + t[i], t[j] = t[j], t[i] +} diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index 32e0cf606..6a3b39d4d 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -14,50 +14,46 @@ package retrieval import ( - "github.com/prometheus/prometheus/retrieval/format" "log" "sort" + "sync" "time" + + "github.com/prometheus/prometheus/retrieval/format" ) const ( intervalKey = "interval" + + targetAddQueueSize = 100 + targetReplaceQueueSize = 1 ) type TargetPool struct { + sync.RWMutex + done chan bool manager TargetManager - targets []Target + targets targets addTargetQueue chan Target - replaceTargetsQueue chan []Target + replaceTargetsQueue chan targets } func NewTargetPool(m TargetManager) *TargetPool { return &TargetPool{ manager: m, - addTargetQueue: make(chan Target), - replaceTargetsQueue: make(chan []Target), + addTargetQueue: make(chan Target, targetAddQueueSize), + replaceTargetsQueue: make(chan targets, targetReplaceQueueSize), } } -func (p TargetPool) Len() int { - return len(p.targets) -} - -func (p TargetPool) Less(i, j int) bool { - return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor()) -} - -func (p TargetPool) Swap(i, j int) { - p.targets[i], p.targets[j] = p.targets[j], p.targets[i] -} - func (p *TargetPool) Run(results chan format.Result, interval time.Duration) { - ticker := time.Tick(interval) + ticker := time.NewTicker(interval) + defer ticker.Stop() for { select { - case <-ticker: + case <-ticker.C: p.runIteration(results, interval) case newTarget := <-p.addTargetQueue: p.addTarget(newTarget) @@ -65,7 +61,7 @@ func (p *TargetPool) Run(results chan format.Result, interval time.Duration) { p.replaceTargets(newTargets) case <-p.done: log.Printf("TargetPool exiting...") - break + return } } } @@ -79,14 +75,29 @@ func (p *TargetPool) AddTarget(target Target) { } func (p *TargetPool) addTarget(target Target) { + p.Lock() + defer p.Unlock() + p.targets = append(p.targets, target) } func (p *TargetPool) ReplaceTargets(newTargets []Target) { - p.replaceTargetsQueue <- newTargets + p.Lock() + defer p.Unlock() + + // If there is anything remaining in the queue for effectuation, clear it out, + // because the last mutation should win. + select { + case <-p.replaceTargetsQueue: + default: + p.replaceTargetsQueue <- newTargets + } } func (p *TargetPool) replaceTargets(newTargets []Target) { + p.Lock() + defer p.Unlock() + // Replace old target list by new one, but reuse those targets from the old // list of targets which are also in the new list (to preserve scheduling and // health state). @@ -98,6 +109,7 @@ func (p *TargetPool) replaceTargets(newTargets []Target) { } } } + p.targets = newTargets } @@ -109,14 +121,15 @@ func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t } func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) { - begin := time.Now() + p.RLock() + defer p.RUnlock() - targetCount := p.Len() - finished := make(chan bool, targetCount) + begin := time.Now() + wait := sync.WaitGroup{} // Sort p.targets by next scheduling time so we can process the earliest // targets first. - sort.Sort(p) + sort.Sort(p.targets) for _, target := range p.targets { now := time.Now() @@ -124,34 +137,29 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura if target.scheduledFor().After(now) { // None of the remaining targets are ready to be scheduled. Signal that // we're done processing them in this scrape iteration. - finished <- true continue } + wait.Add(1) + go func(t Target) { p.runSingle(now, results, t) - finished <- true + wait.Done() }(target) } - for i := 0; i < targetCount; { - select { - case <-finished: - i++ - case newTarget := <-p.addTargetQueue: - p.addTarget(newTarget) - case newTargets := <-p.replaceTargetsQueue: - p.replaceTargets(newTargets) - } - } - - close(finished) + wait.Wait() duration := float64(time.Since(begin) / time.Millisecond) retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration) } -// BUG(all): Not really thread-safe. Only used in /status page for now. func (p *TargetPool) Targets() []Target { - return p.targets + p.RLock() + defer p.RUnlock() + + targets := make([]Target, len(p.targets)) + copy(targets, p.targets) + + return targets } diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index 871421799..dbfb1ef45 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -14,11 +14,12 @@ package retrieval import ( - "github.com/prometheus/prometheus/retrieval/format" - "github.com/prometheus/prometheus/utility/test" "sort" "testing" "time" + + "github.com/prometheus/prometheus/retrieval/format" + "github.com/prometheus/prometheus/utility/test" ) func testTargetPool(t test.Tester) { @@ -113,10 +114,10 @@ func testTargetPool(t test.Tester) { pool.addTarget(&target) } - sort.Sort(pool) + sort.Sort(pool.targets) - if pool.Len() != len(scenario.outputs) { - t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len()) + if pool.targets.Len() != len(scenario.outputs) { + t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.targets.Len()) } else { for j, output := range scenario.outputs { target := pool.targets[j] @@ -127,8 +128,8 @@ func testTargetPool(t test.Tester) { } } - if pool.Len() != len(scenario.outputs) { - t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), pool.Len()) + if pool.targets.Len() != len(scenario.outputs) { + t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), pool.targets.Len()) } } } @@ -187,10 +188,10 @@ func TestTargetPoolReplaceTargets(t *testing.T) { pool.addTarget(oldTarget2) pool.replaceTargets([]Target{newTarget1, newTarget2}) - sort.Sort(pool) + sort.Sort(pool.targets) - if pool.Len() != 2 { - t.Errorf("Expected 2 elements in pool, had %d", pool.Len()) + if pool.targets.Len() != 2 { + t.Errorf("Expected 2 elements in pool, had %d", pool.targets.Len()) } target1 := pool.targets[0].(*target)