Merge pull request #35 from matttproud/fix/targetpool-and-manager-queueing

``TargetManager`` and ``TargetPool`` as pointers.
This commit is contained in:
juliusv 2013-01-15 08:27:52 -08:00
commit 09d543de97
4 changed files with 105 additions and 17 deletions

View file

@ -20,6 +20,7 @@ import (
func TestInterfaces(t *testing.T) {
var (
_ Target = &target{}
_ TargetManager = &targetManager{}
_ healthReporter = target{}
_ scheduler = &healthScheduler{}
)

View file

@ -31,36 +31,36 @@ type TargetManager interface {
type targetManager struct {
requestAllowance chan bool
pools map[time.Duration]TargetPool
pools map[time.Duration]*TargetPool
results chan Result
}
func NewTargetManager(results chan Result, requestAllowance int) TargetManager {
return targetManager{
return &targetManager{
requestAllowance: make(chan bool, requestAllowance),
results: results,
pools: make(map[time.Duration]TargetPool),
pools: make(map[time.Duration]*TargetPool),
}
}
func (m targetManager) acquire() {
func (m *targetManager) acquire() {
m.requestAllowance <- true
}
func (m targetManager) release() {
func (m *targetManager) release() {
<-m.requestAllowance
}
func (m targetManager) Add(t Target) {
func (m *targetManager) Add(t Target) {
targetPool, ok := m.pools[t.Interval()]
if !ok {
targetPool.manager = m
targetPool = NewTargetPool(m)
log.Printf("Pool %s does not exist; creating and starting...", t.Interval())
go targetPool.Run(m.results, t.Interval())
}
heap.Push(&targetPool, t)
heap.Push(targetPool, t)
m.pools[t.Interval()] = targetPool
}
@ -68,7 +68,7 @@ func (m targetManager) Remove(t Target) {
panic("not implemented")
}
func (m targetManager) AddTargetsFromConfig(config *config.Config) {
func (m *targetManager) AddTargetsFromConfig(config *config.Config) {
for _, job := range config.Jobs {
for _, configTargets := range job.Targets {
baseLabels := model.LabelSet{

View file

@ -0,0 +1,87 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"github.com/matttproud/prometheus/utility/test"
"testing"
"time"
)
type fakeTarget struct {
scrapeCount int
schedules []time.Time
interval time.Duration
scheduleIndex int
}
func (t fakeTarget) Address() string {
return "fake"
}
func (t fakeTarget) Interval() time.Duration {
return t.interval
}
func (t *fakeTarget) Scrape(e time.Time, r chan Result) error {
t.scrapeCount++
return nil
}
func (t fakeTarget) State() TargetState {
return ALIVE
}
func (t *fakeTarget) scheduledFor() (time time.Time) {
time = t.schedules[t.scheduleIndex]
t.scheduleIndex++
return
}
func testTargetManager(t test.Tester) {
results := make(chan Result, 5)
targetManager := NewTargetManager(results, 3)
target1GroupA := &fakeTarget{
schedules: []time.Time{time.Now()},
interval: time.Minute,
}
target2GroupA := &fakeTarget{
schedules: []time.Time{time.Now()},
interval: time.Minute,
}
targetManager.Add(target1GroupA)
targetManager.Add(target2GroupA)
target1GroupB := &fakeTarget{
schedules: []time.Time{time.Now()},
interval: time.Minute * 2,
}
targetManager.Add(target1GroupB)
}
func TestTargetManager(t *testing.T) {
testTargetManager(t)
}
func BenchmarkTargetManager(b *testing.B) {
for i := 0; i < b.N; i++ {
testTargetManager(b)
}
}

View file

@ -8,14 +8,14 @@ import (
type TargetPool struct {
done chan bool
targets []*target
targets []Target
manager TargetManager
}
func NewTargetPool(m TargetManager) (p TargetPool) {
p.manager = m
return
func NewTargetPool(m TargetManager) (p *TargetPool) {
return &TargetPool{
manager: m,
}
}
func (p TargetPool) Len() int {
@ -37,7 +37,7 @@ func (p *TargetPool) Pop() interface{} {
}
func (p *TargetPool) Push(element interface{}) {
p.targets = append(p.targets, element.(*target))
p.targets = append(p.targets, element.(Target))
}
func (p TargetPool) Swap(i, j int) {
@ -62,7 +62,7 @@ func (p TargetPool) Stop() {
p.done <- true
}
func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *target) {
func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t Target) {
p.manager.acquire()
defer p.manager.release()
@ -71,7 +71,7 @@ func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *targe
func (p *TargetPool) runIteration(results chan Result) {
for i := 0; i < p.Len(); i++ {
target := heap.Pop(p).(*target)
target := heap.Pop(p).(Target)
if target == nil {
break
}