`TargetManager and TargetPool` ass pointers.

This commit is contained in:
Matt T. Proud 2013-01-15 17:06:17 +01:00
parent a7ed7cae91
commit 190e4e3fa3
4 changed files with 105 additions and 17 deletions

View file

@ -20,6 +20,7 @@ import (
func TestInterfaces(t *testing.T) { func TestInterfaces(t *testing.T) {
var ( var (
_ Target = &target{} _ Target = &target{}
_ TargetManager = &targetManager{}
_ healthReporter = target{} _ healthReporter = target{}
_ scheduler = &healthScheduler{} _ scheduler = &healthScheduler{}
) )

View file

@ -31,36 +31,36 @@ type TargetManager interface {
type targetManager struct { type targetManager struct {
requestAllowance chan bool requestAllowance chan bool
pools map[time.Duration]TargetPool pools map[time.Duration]*TargetPool
results chan Result results chan Result
} }
func NewTargetManager(results chan Result, requestAllowance int) TargetManager { func NewTargetManager(results chan Result, requestAllowance int) TargetManager {
return targetManager{ return &targetManager{
requestAllowance: make(chan bool, requestAllowance), requestAllowance: make(chan bool, requestAllowance),
results: results, results: results,
pools: make(map[time.Duration]TargetPool), pools: make(map[time.Duration]*TargetPool),
} }
} }
func (m targetManager) acquire() { func (m *targetManager) acquire() {
m.requestAllowance <- true m.requestAllowance <- true
} }
func (m targetManager) release() { func (m *targetManager) release() {
<-m.requestAllowance <-m.requestAllowance
} }
func (m targetManager) Add(t Target) { func (m *targetManager) Add(t Target) {
targetPool, ok := m.pools[t.Interval()] targetPool, ok := m.pools[t.Interval()]
if !ok { if !ok {
targetPool.manager = m targetPool = NewTargetPool(m)
log.Printf("Pool %s does not exist; creating and starting...", t.Interval()) log.Printf("Pool %s does not exist; creating and starting...", t.Interval())
go targetPool.Run(m.results, t.Interval()) go targetPool.Run(m.results, t.Interval())
} }
heap.Push(&targetPool, t) heap.Push(targetPool, t)
m.pools[t.Interval()] = targetPool m.pools[t.Interval()] = targetPool
} }
@ -68,7 +68,7 @@ func (m targetManager) Remove(t Target) {
panic("not implemented") panic("not implemented")
} }
func (m targetManager) AddTargetsFromConfig(config *config.Config) { func (m *targetManager) AddTargetsFromConfig(config *config.Config) {
for _, job := range config.Jobs { for _, job := range config.Jobs {
for _, configTargets := range job.Targets { for _, configTargets := range job.Targets {
baseLabels := model.LabelSet{ baseLabels := model.LabelSet{

View file

@ -0,0 +1,87 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"github.com/matttproud/prometheus/utility/test"
"testing"
"time"
)
type fakeTarget struct {
scrapeCount int
schedules []time.Time
interval time.Duration
scheduleIndex int
}
func (t fakeTarget) Address() string {
return "fake"
}
func (t fakeTarget) Interval() time.Duration {
return t.interval
}
func (t *fakeTarget) Scrape(e time.Time, r chan Result) error {
t.scrapeCount++
return nil
}
func (t fakeTarget) State() TargetState {
return ALIVE
}
func (t *fakeTarget) scheduledFor() (time time.Time) {
time = t.schedules[t.scheduleIndex]
t.scheduleIndex++
return
}
func testTargetManager(t test.Tester) {
results := make(chan Result, 5)
targetManager := NewTargetManager(results, 3)
target1GroupA := &fakeTarget{
schedules: []time.Time{time.Now()},
interval: time.Minute,
}
target2GroupA := &fakeTarget{
schedules: []time.Time{time.Now()},
interval: time.Minute,
}
targetManager.Add(target1GroupA)
targetManager.Add(target2GroupA)
target1GroupB := &fakeTarget{
schedules: []time.Time{time.Now()},
interval: time.Minute * 2,
}
targetManager.Add(target1GroupB)
}
func TestTargetManager(t *testing.T) {
testTargetManager(t)
}
func BenchmarkTargetManager(b *testing.B) {
for i := 0; i < b.N; i++ {
testTargetManager(b)
}
}

View file

@ -8,14 +8,14 @@ import (
type TargetPool struct { type TargetPool struct {
done chan bool done chan bool
targets []*target targets []Target
manager TargetManager manager TargetManager
} }
func NewTargetPool(m TargetManager) (p TargetPool) { func NewTargetPool(m TargetManager) (p *TargetPool) {
p.manager = m return &TargetPool{
manager: m,
return }
} }
func (p TargetPool) Len() int { func (p TargetPool) Len() int {
@ -37,7 +37,7 @@ func (p *TargetPool) Pop() interface{} {
} }
func (p *TargetPool) Push(element interface{}) { func (p *TargetPool) Push(element interface{}) {
p.targets = append(p.targets, element.(*target)) p.targets = append(p.targets, element.(Target))
} }
func (p TargetPool) Swap(i, j int) { func (p TargetPool) Swap(i, j int) {
@ -62,7 +62,7 @@ func (p TargetPool) Stop() {
p.done <- true p.done <- true
} }
func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *target) { func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t Target) {
p.manager.acquire() p.manager.acquire()
defer p.manager.release() defer p.manager.release()
@ -71,7 +71,7 @@ func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *targe
func (p *TargetPool) runIteration(results chan Result) { func (p *TargetPool) runIteration(results chan Result) {
for i := 0; i < p.Len(); i++ { for i := 0; i < p.Len(); i++ {
target := heap.Pop(p).(*target) target := heap.Pop(p).(Target)
if target == nil { if target == nil {
break break
} }