prometheus/retrieval/targetpool.go
Matt T. Proud e01b6cdb44 Duration statistics for each target pool.
We have an open question of how long does it take for each target
pool to have the state retrieved from all participating elements.
This commit starts by providing insight into this.
2013-01-28 16:36:28 +01:00

113 lines
2.1 KiB
Go

package retrieval
import (
"container/heap"
"github.com/prometheus/prometheus/retrieval/format"
"log"
"time"
)
const (
intervalKey = "interval"
)
type TargetPool struct {
done chan bool
manager TargetManager
targets []Target
}
func NewTargetPool(m TargetManager) (p *TargetPool) {
return &TargetPool{
manager: m,
}
}
func (p TargetPool) Len() int {
return len(p.targets)
}
func (p TargetPool) Less(i, j int) bool {
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
}
func (p *TargetPool) Pop() interface{} {
oldPool := p.targets
futureLength := p.Len() - 1
element := oldPool[futureLength]
futurePool := oldPool[0:futureLength]
p.targets = futurePool
return element
}
func (p *TargetPool) Push(element interface{}) {
p.targets = append(p.targets, element.(Target))
}
func (p TargetPool) Swap(i, j int) {
p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
}
func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
ticker := time.Tick(interval)
for {
select {
case <-ticker:
p.runIteration(results, interval)
case <-p.done:
log.Printf("TargetPool exiting...")
break
}
}
}
func (p TargetPool) Stop() {
p.done <- true
}
func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) {
p.manager.acquire()
defer p.manager.release()
t.Scrape(earliest, results)
}
func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) {
begin := time.Now()
targetCount := p.Len()
finished := make(chan bool, targetCount)
for i := 0; i < targetCount; i++ {
target := heap.Pop(p).(Target)
if target == nil {
break
}
now := time.Now()
if target.scheduledFor().After(now) {
heap.Push(p, target)
break
}
go func() {
p.runSingle(now, results, target)
heap.Push(p, target)
finished <- true
}()
}
for i := 0; i < targetCount; i++ {
<-finished
}
close(finished)
duration := float64(time.Now().Sub(begin) / time.Millisecond)
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
}