From d772c6c2b54fb0b5643a0c07c6354eb5e533479b Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Sat, 12 Jan 2013 21:22:59 +0100 Subject: [PATCH 1/3] Refactor target scheduling to separate facility. ``Target`` will be refactored down the road to support various nuanced endpoint types. Thusly incorporating the scheduling behavior within it will be problematic. To that end, the scheduling behavior has been moved into a separate assistance type to improve conciseness and testability. ``make format`` was also run. --- retrieval/targetpool_test.go | 188 +++++++++++++++++------------------ 1 file changed, 94 insertions(+), 94 deletions(-) diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index 2fb9308d91..10ec4a4173 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -13,11 +13,11 @@ package retrieval -import ( - "container/heap" - "testing" - "time" -) +// import ( +// "container/heap" +// "testing" +// "time" +// ) type literalScheduler time.Time @@ -33,84 +33,84 @@ func TestTargetPool(t *testing.T) { size int } - type input struct { - address string - scheduledFor time.Time - } +// type input struct { +// address string +// scheduledFor time.Time +// } - type output struct { - address string - } +// type output struct { +// address string +// } - var scenarios = []struct { - name string - outputs []output - inputs []input - }{ - { - name: "empty", - inputs: []input{}, - outputs: []output{}, - }, - { - name: "single element", - inputs: []input{ - { - address: "http://single.com", - }, - }, - outputs: []output{ - { - address: "http://single.com", - }, - }, - }, - { - name: "plural descending schedules", - inputs: []input{ - { - address: "http://plural-descending.com", - scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC), - }, - { - address: "http://plural-descending.net", - scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC), - }, - }, - outputs: []output{ - { - address: "http://plural-descending.net", - }, - { - address: "http://plural-descending.com", - }, - }, - }, - { - name: "plural ascending schedules", - inputs: []input{ - { - address: "http://plural-ascending.net", - scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC), - }, - { - address: "http://plural-ascending.com", - scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC), - }, - }, - outputs: []output{ - { - address: "http://plural-ascending.net", - }, - { - address: "http://plural-ascending.com", - }, - }, - }, - } +// var scenarios = []struct { +// name string +// outputs []output +// inputs []input +// }{ +// { +// name: "empty", +// inputs: []input{}, +// outputs: []output{}, +// }, +// { +// name: "single element", +// inputs: []input{ +// { +// address: "http://single.com", +// }, +// }, +// outputs: []output{ +// { +// address: "http://single.com", +// }, +// }, +// }, +// { +// name: "plural descending schedules", +// inputs: []input{ +// { +// address: "http://plural-descending.com", +// scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC), +// }, +// { +// address: "http://plural-descending.net", +// scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC), +// }, +// }, +// outputs: []output{ +// { +// address: "http://plural-descending.net", +// }, +// { +// address: "http://plural-descending.com", +// }, +// }, +// }, +// { +// name: "plural ascending schedules", +// inputs: []input{ +// { +// address: "http://plural-ascending.net", +// scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC), +// }, +// { +// address: "http://plural-ascending.com", +// scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC), +// }, +// }, +// outputs: []output{ +// { +// address: "http://plural-ascending.net", +// }, +// { +// address: "http://plural-ascending.com", +// }, +// }, +// }, +// } - for i, scenario := range scenarios { - pool := TargetPool{} +// for i, scenario := range scenarios { +// pool := TargetPool{} for _, input := range scenario.inputs { target := Target{ @@ -118,20 +118,20 @@ func TestTargetPool(t *testing.T) { scheduler: literalScheduler(input.scheduledFor), } - heap.Push(&pool, &target) - } +// heap.Push(&pool, &target) +// } - if pool.Len() != len(scenario.outputs) { - t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len()) - } else { - for j, output := range scenario.outputs { - target := heap.Pop(&pool).(*Target) +// if pool.Len() != len(scenario.outputs) { +// t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len()) +// } else { +// for j, output := range scenario.outputs { +// target := heap.Pop(&pool).(*Target) - if target.Address != output.address { - t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address) +// if target.Address != output.address { +// t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address) - } - } - } - } -} +// } +// } +// } +// } +// } From 9af0faaefbacc2cca277cf40292056852684be99 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Sat, 12 Jan 2013 21:58:52 +0100 Subject: [PATCH 2/3] Get ``TargetPool`` test working again. --- retrieval/targetpool_test.go | 188 +++++++++++++++++------------------ 1 file changed, 94 insertions(+), 94 deletions(-) diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index 10ec4a4173..2fb9308d91 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -13,11 +13,11 @@ package retrieval -// import ( -// "container/heap" -// "testing" -// "time" -// ) +import ( + "container/heap" + "testing" + "time" +) type literalScheduler time.Time @@ -33,84 +33,84 @@ func TestTargetPool(t *testing.T) { size int } -// type input struct { -// address string -// scheduledFor time.Time -// } + type input struct { + address string + scheduledFor time.Time + } -// type output struct { -// address string -// } + type output struct { + address string + } -// var scenarios = []struct { -// name string -// outputs []output -// inputs []input -// }{ -// { -// name: "empty", -// inputs: []input{}, -// outputs: []output{}, -// }, -// { -// name: "single element", -// inputs: []input{ -// { -// address: "http://single.com", -// }, -// }, -// outputs: []output{ -// { -// address: "http://single.com", -// }, -// }, -// }, -// { -// name: "plural descending schedules", -// inputs: []input{ -// { -// address: "http://plural-descending.com", -// scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC), -// }, -// { -// address: "http://plural-descending.net", -// scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC), -// }, -// }, -// outputs: []output{ -// { -// address: "http://plural-descending.net", -// }, -// { -// address: "http://plural-descending.com", -// }, -// }, -// }, -// { -// name: "plural ascending schedules", -// inputs: []input{ -// { -// address: "http://plural-ascending.net", -// scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC), -// }, -// { -// address: "http://plural-ascending.com", -// scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC), -// }, -// }, -// outputs: []output{ -// { -// address: "http://plural-ascending.net", -// }, -// { -// address: "http://plural-ascending.com", -// }, -// }, -// }, -// } + var scenarios = []struct { + name string + outputs []output + inputs []input + }{ + { + name: "empty", + inputs: []input{}, + outputs: []output{}, + }, + { + name: "single element", + inputs: []input{ + { + address: "http://single.com", + }, + }, + outputs: []output{ + { + address: "http://single.com", + }, + }, + }, + { + name: "plural descending schedules", + inputs: []input{ + { + address: "http://plural-descending.com", + scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC), + }, + { + address: "http://plural-descending.net", + scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC), + }, + }, + outputs: []output{ + { + address: "http://plural-descending.net", + }, + { + address: "http://plural-descending.com", + }, + }, + }, + { + name: "plural ascending schedules", + inputs: []input{ + { + address: "http://plural-ascending.net", + scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC), + }, + { + address: "http://plural-ascending.com", + scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC), + }, + }, + outputs: []output{ + { + address: "http://plural-ascending.net", + }, + { + address: "http://plural-ascending.com", + }, + }, + }, + } -// for i, scenario := range scenarios { -// pool := TargetPool{} + for i, scenario := range scenarios { + pool := TargetPool{} for _, input := range scenario.inputs { target := Target{ @@ -118,20 +118,20 @@ func TestTargetPool(t *testing.T) { scheduler: literalScheduler(input.scheduledFor), } -// heap.Push(&pool, &target) -// } + heap.Push(&pool, &target) + } -// if pool.Len() != len(scenario.outputs) { -// t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len()) -// } else { -// for j, output := range scenario.outputs { -// target := heap.Pop(&pool).(*Target) + if pool.Len() != len(scenario.outputs) { + t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len()) + } else { + for j, output := range scenario.outputs { + target := heap.Pop(&pool).(*Target) -// if target.Address != output.address { -// t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address) + if target.Address != output.address { + t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address) -// } -// } -// } -// } -// } + } + } + } + } +} From 9752f1e61d030879a5914631b191a11eb693abae Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Sun, 13 Jan 2013 10:46:55 +0100 Subject: [PATCH 3/3] Refactor Target as interface for testability. Future tests around the ``TargetPool`` and ``TargetManager`` and friends will be a lot easier when the concrete behaviors of ``Target`` can be extracted out. Plus, each ``Target``, I suspect, will have its own resolution and query strategy. --- retrieval/interface_test.go | 7 ++- retrieval/scheduler_test.go | 13 +++++- retrieval/target.go | 86 ++++++++++++++++++++++++++++++------ retrieval/targetmanager.go | 16 +++---- retrieval/targetpool.go | 8 ++-- retrieval/targetpool_test.go | 42 +++++++++++++++--- 6 files changed, 138 insertions(+), 34 deletions(-) diff --git a/retrieval/interface_test.go b/retrieval/interface_test.go index 7e0ffabaa3..b335daa9ac 100644 --- a/retrieval/interface_test.go +++ b/retrieval/interface_test.go @@ -18,6 +18,9 @@ import ( ) func TestInterfaces(t *testing.T) { - var _ scheduler = &healthScheduler{} - var _ healthReporter = Target{} + var ( + _ Target = &target{} + _ healthReporter = target{} + _ scheduler = &healthScheduler{} + ) } diff --git a/retrieval/scheduler_test.go b/retrieval/scheduler_test.go index 5a741ed236..c02253c609 100644 --- a/retrieval/scheduler_test.go +++ b/retrieval/scheduler_test.go @@ -14,6 +14,7 @@ package retrieval import ( + "github.com/matttproud/prometheus/utility/test" "testing" "time" ) @@ -44,7 +45,7 @@ func (t *fakeTimeProvider) Now() (time time.Time) { return } -func TestHealthScheduler(t *testing.T) { +func testHealthScheduler(t test.Tester) { now := time.Now() var scenarios = []struct { futureHealthState []TargetState @@ -109,3 +110,13 @@ func TestHealthScheduler(t *testing.T) { } } } + +func TestHealthScheduler(t *testing.T) { + testHealthScheduler(t) +} + +func BenchmarkHealthScheduler(b *testing.B) { + for i := 0; i < b.N; i++ { + testHealthScheduler(b) + } +} diff --git a/retrieval/target.go b/retrieval/target.go index f8e2010a18..edb02a64d6 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -23,35 +23,87 @@ import ( "time" ) +// The state of the given Target. type TargetState int const ( + // The Target has not been seen; we know nothing about it, except that it is + // on our docket for examination. UNKNOWN TargetState = iota + // The Target has been found and successfully queried. ALIVE + // The Target was either historically found or not found and then determined + // to be unhealthy by either not responding or disappearing. UNREACHABLE ) +// A healthReporter is a type that can provide insight into its health state. +// +// It mainly exists for testability reasons to decouple the scheduler behaviors +// from fully-fledged Target and other types. type healthReporter interface { + // Report the last-known health state for this target. State() TargetState } -type Target struct { +// A Target represents an endpoint that should be interrogated for metrics. +// +// The protocol described by this type will likely change in future iterations, +// as it offers no good support for aggregated targets and fan out. Thusly, +// it is likely that the current Target and target uses will be +// wrapped with some resolver type. +// +// For the future, the Target protocol will abstract away the exact means that +// metrics are retrieved and deserialized from the given instance to which it +// refers. +type Target interface { + // Retrieve values from this target. + // + // earliest refers to the soonest available opportunity to reschedule the + // target for a future retrieval. It is up to the underlying scheduler type, + // alluded to in the scheduledFor function, to use this as it wants to. The + // current use case is to create a common batching time for scraping multiple + // Targets in the future through the TargetPool. + Scrape(earliest time.Time, results chan Result) error + // Fulfill the healthReporter interface. + State() TargetState + // Report the soonest time at which this Target may be scheduled for + // retrieval. This value needn't convey that the operation occurs at this + // time, but it should occur no sooner than it. + // + // Right now, this is used as the sorting key in TargetPool. + scheduledFor() time.Time + // The address to which the Target corresponds. Out of all of the available + // points in this interface, this one is the best candidate to change given + // the ways to express the endpoint. + Address() string + // How frequently queries occur. + Interval() time.Duration +} + +// target is a Target that refers to a singular HTTP or HTTPS endpoint. +type target struct { + // scheduler provides the scheduling strategy that is used to formulate what + // is returned in Target.scheduledFor. scheduler scheduler state TargetState - Address string - Deadline time.Duration + address string + // What is the deadline for the HTTP or HTTPS against this endpoint. + Deadline time.Duration + // Any base labels that are added to this target and its metrics. BaseLabels model.LabelSet // XXX: Move this to a field with the target manager initialization instead of here. - Interval time.Duration + interval time.Duration } -func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) *Target { - target := &Target{ - Address: address, +// Furnish a reasonably configured target for querying. +func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) Target { + target := &target{ + address: address, Deadline: deadline, - Interval: interval, + interval: interval, BaseLabels: baseLabels, } @@ -69,7 +121,7 @@ type Result struct { Target Target } -func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) { +func (t *target) Scrape(earliest time.Time, results chan Result) (err error) { result := Result{} defer func() { @@ -92,7 +144,7 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) { request := func() { ti := time.Now() - resp, err := http.Get(t.Address) + resp, err := http.Get(t.Address()) if err != nil { return } @@ -110,7 +162,7 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) { return } - baseLabels := map[string]string{"instance": t.Address} + baseLabels := map[string]string{"instance": t.Address()} for name, v := range intermediate { asMap, ok := v.(map[string]interface{}) @@ -199,10 +251,18 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) { return } -func (t Target) State() TargetState { +func (t target) State() TargetState { return t.state } -func (t Target) scheduledFor() time.Time { +func (t target) scheduledFor() time.Time { return t.scheduler.ScheduledFor() } + +func (t target) Address() string { + return t.address +} + +func (t target) Interval() time.Duration { + return t.interval +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index d7f5c36570..c9d5058455 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -24,8 +24,8 @@ import ( type TargetManager interface { acquire() release() - Add(t *Target) - Remove(t *Target) + Add(t Target) + Remove(t Target) AddTargetsFromConfig(config *config.Config) } @@ -51,20 +51,20 @@ func (m targetManager) release() { <-m.requestAllowance } -func (m targetManager) Add(t *Target) { - targetPool, ok := m.pools[t.Interval] +func (m targetManager) Add(t Target) { + targetPool, ok := m.pools[t.Interval()] if !ok { targetPool.manager = m - log.Printf("Pool %s does not exist; creating and starting...", t.Interval) - go targetPool.Run(m.results, t.Interval) + log.Printf("Pool %s does not exist; creating and starting...", t.Interval()) + go targetPool.Run(m.results, t.Interval()) } heap.Push(&targetPool, t) - m.pools[t.Interval] = targetPool + m.pools[t.Interval()] = targetPool } -func (m targetManager) Remove(t *Target) { +func (m targetManager) Remove(t Target) { panic("not implemented") } diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index acc454a92e..7b30420b10 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -8,7 +8,7 @@ import ( type TargetPool struct { done chan bool - targets []*Target + targets []*target manager TargetManager } @@ -37,7 +37,7 @@ func (p *TargetPool) Pop() interface{} { } func (p *TargetPool) Push(element interface{}) { - p.targets = append(p.targets, element.(*Target)) + p.targets = append(p.targets, element.(*target)) } func (p TargetPool) Swap(i, j int) { @@ -62,7 +62,7 @@ func (p TargetPool) Stop() { p.done <- true } -func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *Target) { +func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *target) { p.manager.acquire() defer p.manager.release() @@ -71,7 +71,7 @@ func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *Targe func (p *TargetPool) runIteration(results chan Result) { for i := 0; i < p.Len(); i++ { - target := heap.Pop(p).(*Target) + target := heap.Pop(p).(*target) if target == nil { break } diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index 2fb9308d91..07197b7179 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -15,6 +15,7 @@ package retrieval import ( "container/heap" + "github.com/matttproud/prometheus/utility/test" "testing" "time" ) @@ -28,7 +29,7 @@ func (s literalScheduler) ScheduledFor() time.Time { func (s literalScheduler) Reschedule(earliest time.Time, future TargetState) { } -func TestTargetPool(t *testing.T) { +func testTargetPool(t test.Tester) { type expectation struct { size int } @@ -113,25 +114,54 @@ func TestTargetPool(t *testing.T) { pool := TargetPool{} for _, input := range scenario.inputs { - target := Target{ - Address: input.address, + target := target{ + address: input.address, scheduler: literalScheduler(input.scheduledFor), } heap.Push(&pool, &target) } + targets := []Target{} + if pool.Len() != len(scenario.outputs) { t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len()) } else { for j, output := range scenario.outputs { - target := heap.Pop(&pool).(*Target) + target := heap.Pop(&pool).(Target) - if target.Address != output.address { - t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address) + if target.Address() != output.address { + t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address()) } + targets = append(targets, target) + } + + if pool.Len() != 0 { + t.Errorf("%s %d. expected pool to be empty, had %d", scenario.name, i, pool.Len()) + } + + if len(targets) != len(scenario.outputs) { + t.Errorf("%s %d. expected to receive %d elements, got %d", scenario.name, i, len(scenario.outputs), len(targets)) + } + + for _, target := range targets { + heap.Push(&pool, target) + } + + if pool.Len() != len(scenario.outputs) { + t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), pool.Len()) } } } } + +func TestTargetPool(t *testing.T) { + testTargetPool(t) +} + +func BenchmarkTargetPool(b *testing.B) { + for i := 0; i < b.N; i++ { + testTargetPool(b) + } +}