diff --git a/retrieval/interface_test.go b/retrieval/interface_test.go index 7e0ffabaa3..b335daa9ac 100644 --- a/retrieval/interface_test.go +++ b/retrieval/interface_test.go @@ -18,6 +18,9 @@ import ( ) func TestInterfaces(t *testing.T) { - var _ scheduler = &healthScheduler{} - var _ healthReporter = Target{} + var ( + _ Target = &target{} + _ healthReporter = target{} + _ scheduler = &healthScheduler{} + ) } diff --git a/retrieval/scheduler_test.go b/retrieval/scheduler_test.go index 5a741ed236..c02253c609 100644 --- a/retrieval/scheduler_test.go +++ b/retrieval/scheduler_test.go @@ -14,6 +14,7 @@ package retrieval import ( + "github.com/matttproud/prometheus/utility/test" "testing" "time" ) @@ -44,7 +45,7 @@ func (t *fakeTimeProvider) Now() (time time.Time) { return } -func TestHealthScheduler(t *testing.T) { +func testHealthScheduler(t test.Tester) { now := time.Now() var scenarios = []struct { futureHealthState []TargetState @@ -109,3 +110,13 @@ func TestHealthScheduler(t *testing.T) { } } } + +func TestHealthScheduler(t *testing.T) { + testHealthScheduler(t) +} + +func BenchmarkHealthScheduler(b *testing.B) { + for i := 0; i < b.N; i++ { + testHealthScheduler(b) + } +} diff --git a/retrieval/target.go b/retrieval/target.go index f8e2010a18..edb02a64d6 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -23,35 +23,87 @@ import ( "time" ) +// The state of the given Target. type TargetState int const ( + // The Target has not been seen; we know nothing about it, except that it is + // on our docket for examination. UNKNOWN TargetState = iota + // The Target has been found and successfully queried. ALIVE + // The Target was either historically found or not found and then determined + // to be unhealthy by either not responding or disappearing. UNREACHABLE ) +// A healthReporter is a type that can provide insight into its health state. +// +// It mainly exists for testability reasons to decouple the scheduler behaviors +// from fully-fledged Target and other types. type healthReporter interface { + // Report the last-known health state for this target. State() TargetState } -type Target struct { +// A Target represents an endpoint that should be interrogated for metrics. +// +// The protocol described by this type will likely change in future iterations, +// as it offers no good support for aggregated targets and fan out. Thusly, +// it is likely that the current Target and target uses will be +// wrapped with some resolver type. +// +// For the future, the Target protocol will abstract away the exact means that +// metrics are retrieved and deserialized from the given instance to which it +// refers. +type Target interface { + // Retrieve values from this target. + // + // earliest refers to the soonest available opportunity to reschedule the + // target for a future retrieval. It is up to the underlying scheduler type, + // alluded to in the scheduledFor function, to use this as it wants to. The + // current use case is to create a common batching time for scraping multiple + // Targets in the future through the TargetPool. + Scrape(earliest time.Time, results chan Result) error + // Fulfill the healthReporter interface. + State() TargetState + // Report the soonest time at which this Target may be scheduled for + // retrieval. This value needn't convey that the operation occurs at this + // time, but it should occur no sooner than it. + // + // Right now, this is used as the sorting key in TargetPool. + scheduledFor() time.Time + // The address to which the Target corresponds. Out of all of the available + // points in this interface, this one is the best candidate to change given + // the ways to express the endpoint. + Address() string + // How frequently queries occur. + Interval() time.Duration +} + +// target is a Target that refers to a singular HTTP or HTTPS endpoint. +type target struct { + // scheduler provides the scheduling strategy that is used to formulate what + // is returned in Target.scheduledFor. scheduler scheduler state TargetState - Address string - Deadline time.Duration + address string + // What is the deadline for the HTTP or HTTPS against this endpoint. + Deadline time.Duration + // Any base labels that are added to this target and its metrics. BaseLabels model.LabelSet // XXX: Move this to a field with the target manager initialization instead of here. - Interval time.Duration + interval time.Duration } -func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) *Target { - target := &Target{ - Address: address, +// Furnish a reasonably configured target for querying. +func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) Target { + target := &target{ + address: address, Deadline: deadline, - Interval: interval, + interval: interval, BaseLabels: baseLabels, } @@ -69,7 +121,7 @@ type Result struct { Target Target } -func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) { +func (t *target) Scrape(earliest time.Time, results chan Result) (err error) { result := Result{} defer func() { @@ -92,7 +144,7 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) { request := func() { ti := time.Now() - resp, err := http.Get(t.Address) + resp, err := http.Get(t.Address()) if err != nil { return } @@ -110,7 +162,7 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) { return } - baseLabels := map[string]string{"instance": t.Address} + baseLabels := map[string]string{"instance": t.Address()} for name, v := range intermediate { asMap, ok := v.(map[string]interface{}) @@ -199,10 +251,18 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) { return } -func (t Target) State() TargetState { +func (t target) State() TargetState { return t.state } -func (t Target) scheduledFor() time.Time { +func (t target) scheduledFor() time.Time { return t.scheduler.ScheduledFor() } + +func (t target) Address() string { + return t.address +} + +func (t target) Interval() time.Duration { + return t.interval +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index d7f5c36570..c9d5058455 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -24,8 +24,8 @@ import ( type TargetManager interface { acquire() release() - Add(t *Target) - Remove(t *Target) + Add(t Target) + Remove(t Target) AddTargetsFromConfig(config *config.Config) } @@ -51,20 +51,20 @@ func (m targetManager) release() { <-m.requestAllowance } -func (m targetManager) Add(t *Target) { - targetPool, ok := m.pools[t.Interval] +func (m targetManager) Add(t Target) { + targetPool, ok := m.pools[t.Interval()] if !ok { targetPool.manager = m - log.Printf("Pool %s does not exist; creating and starting...", t.Interval) - go targetPool.Run(m.results, t.Interval) + log.Printf("Pool %s does not exist; creating and starting...", t.Interval()) + go targetPool.Run(m.results, t.Interval()) } heap.Push(&targetPool, t) - m.pools[t.Interval] = targetPool + m.pools[t.Interval()] = targetPool } -func (m targetManager) Remove(t *Target) { +func (m targetManager) Remove(t Target) { panic("not implemented") } diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index acc454a92e..7b30420b10 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -8,7 +8,7 @@ import ( type TargetPool struct { done chan bool - targets []*Target + targets []*target manager TargetManager } @@ -37,7 +37,7 @@ func (p *TargetPool) Pop() interface{} { } func (p *TargetPool) Push(element interface{}) { - p.targets = append(p.targets, element.(*Target)) + p.targets = append(p.targets, element.(*target)) } func (p TargetPool) Swap(i, j int) { @@ -62,7 +62,7 @@ func (p TargetPool) Stop() { p.done <- true } -func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *Target) { +func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *target) { p.manager.acquire() defer p.manager.release() @@ -71,7 +71,7 @@ func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *Targe func (p *TargetPool) runIteration(results chan Result) { for i := 0; i < p.Len(); i++ { - target := heap.Pop(p).(*Target) + target := heap.Pop(p).(*target) if target == nil { break } diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index 2fb9308d91..07197b7179 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -15,6 +15,7 @@ package retrieval import ( "container/heap" + "github.com/matttproud/prometheus/utility/test" "testing" "time" ) @@ -28,7 +29,7 @@ func (s literalScheduler) ScheduledFor() time.Time { func (s literalScheduler) Reschedule(earliest time.Time, future TargetState) { } -func TestTargetPool(t *testing.T) { +func testTargetPool(t test.Tester) { type expectation struct { size int } @@ -113,25 +114,54 @@ func TestTargetPool(t *testing.T) { pool := TargetPool{} for _, input := range scenario.inputs { - target := Target{ - Address: input.address, + target := target{ + address: input.address, scheduler: literalScheduler(input.scheduledFor), } heap.Push(&pool, &target) } + targets := []Target{} + if pool.Len() != len(scenario.outputs) { t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len()) } else { for j, output := range scenario.outputs { - target := heap.Pop(&pool).(*Target) + target := heap.Pop(&pool).(Target) - if target.Address != output.address { - t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address) + if target.Address() != output.address { + t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address()) } + targets = append(targets, target) + } + + if pool.Len() != 0 { + t.Errorf("%s %d. expected pool to be empty, had %d", scenario.name, i, pool.Len()) + } + + if len(targets) != len(scenario.outputs) { + t.Errorf("%s %d. expected to receive %d elements, got %d", scenario.name, i, len(scenario.outputs), len(targets)) + } + + for _, target := range targets { + heap.Push(&pool, target) + } + + if pool.Len() != len(scenario.outputs) { + t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), pool.Len()) } } } } + +func TestTargetPool(t *testing.T) { + testTargetPool(t) +} + +func BenchmarkTargetPool(b *testing.B) { + for i := 0; i < b.N; i++ { + testTargetPool(b) + } +}