prometheus/retrieval/targetpool.go
Matt T. Proud 8f4c7ece92 Destroy naked returns in half of corpus.
The use of naked return values is frowned upon.  This is the first
of two bulk updates to remove them.
2013-05-16 10:53:25 +03:00

158 lines
3.8 KiB
Go

// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"github.com/prometheus/prometheus/retrieval/format"
"log"
"sort"
"time"
)
const (
intervalKey = "interval"
)
type TargetPool struct {
done chan bool
manager TargetManager
targets []Target
addTargetQueue chan Target
replaceTargetsQueue chan []Target
}
func NewTargetPool(m TargetManager) *TargetPool {
return &TargetPool{
manager: m,
addTargetQueue: make(chan Target),
replaceTargetsQueue: make(chan []Target),
}
}
func (p TargetPool) Len() int {
return len(p.targets)
}
func (p TargetPool) Less(i, j int) bool {
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
}
func (p TargetPool) Swap(i, j int) {
p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
}
func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
ticker := time.Tick(interval)
for {
select {
case <-ticker:
p.runIteration(results, interval)
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case newTargets := <-p.replaceTargetsQueue:
p.replaceTargets(newTargets)
case <-p.done:
log.Printf("TargetPool exiting...")
break
}
}
}
func (p TargetPool) Stop() {
p.done <- true
}
func (p *TargetPool) AddTarget(target Target) {
p.addTargetQueue <- target
}
func (p *TargetPool) addTarget(target Target) {
p.targets = append(p.targets, target)
}
func (p *TargetPool) ReplaceTargets(newTargets []Target) {
p.replaceTargetsQueue <- newTargets
}
func (p *TargetPool) replaceTargets(newTargets []Target) {
// Replace old target list by new one, but reuse those targets from the old
// list of targets which are also in the new list (to preserve scheduling and
// health state).
for j, newTarget := range newTargets {
for _, oldTarget := range p.targets {
if oldTarget.Address() == newTarget.Address() {
oldTarget.Merge(newTargets[j])
newTargets[j] = oldTarget
}
}
}
p.targets = newTargets
}
func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) {
p.manager.acquire()
defer p.manager.release()
t.Scrape(earliest, results)
}
func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) {
begin := time.Now()
targetCount := p.Len()
finished := make(chan bool, targetCount)
// Sort p.targets by next scheduling time so we can process the earliest
// targets first.
sort.Sort(p)
for _, target := range p.targets {
now := time.Now()
if target.scheduledFor().After(now) {
// None of the remaining targets are ready to be scheduled. Signal that
// we're done processing them in this scrape iteration.
finished <- true
continue
}
go func(t Target) {
p.runSingle(now, results, t)
finished <- true
}(target)
}
for i := 0; i < targetCount; {
select {
case <-finished:
i++
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case newTargets := <-p.replaceTargetsQueue:
p.replaceTargets(newTargets)
}
}
close(finished)
duration := float64(time.Since(begin) / time.Millisecond)
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
}
// BUG(all): Not really thread-safe. Only used in /status page for now.
func (p *TargetPool) Targets() []Target {
return p.targets
}