mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Refactor Target as interface for testability.
Future tests around the ``TargetPool`` and ``TargetManager`` and friends will be a lot easier when the concrete behaviors of ``Target`` can be extracted out. Plus, each ``Target``, I suspect, will have its own resolution and query strategy.
This commit is contained in:
parent
9af0faaefb
commit
9752f1e61d
|
@ -18,6 +18,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestInterfaces(t *testing.T) {
|
func TestInterfaces(t *testing.T) {
|
||||||
var _ scheduler = &healthScheduler{}
|
var (
|
||||||
var _ healthReporter = Target{}
|
_ Target = &target{}
|
||||||
|
_ healthReporter = target{}
|
||||||
|
_ scheduler = &healthScheduler{}
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
package retrieval
|
package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/matttproud/prometheus/utility/test"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -44,7 +45,7 @@ func (t *fakeTimeProvider) Now() (time time.Time) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHealthScheduler(t *testing.T) {
|
func testHealthScheduler(t test.Tester) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
var scenarios = []struct {
|
var scenarios = []struct {
|
||||||
futureHealthState []TargetState
|
futureHealthState []TargetState
|
||||||
|
@ -109,3 +110,13 @@ func TestHealthScheduler(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHealthScheduler(t *testing.T) {
|
||||||
|
testHealthScheduler(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkHealthScheduler(b *testing.B) {
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
testHealthScheduler(b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -23,35 +23,87 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// The state of the given Target.
|
||||||
type TargetState int
|
type TargetState int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// The Target has not been seen; we know nothing about it, except that it is
|
||||||
|
// on our docket for examination.
|
||||||
UNKNOWN TargetState = iota
|
UNKNOWN TargetState = iota
|
||||||
|
// The Target has been found and successfully queried.
|
||||||
ALIVE
|
ALIVE
|
||||||
|
// The Target was either historically found or not found and then determined
|
||||||
|
// to be unhealthy by either not responding or disappearing.
|
||||||
UNREACHABLE
|
UNREACHABLE
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// A healthReporter is a type that can provide insight into its health state.
|
||||||
|
//
|
||||||
|
// It mainly exists for testability reasons to decouple the scheduler behaviors
|
||||||
|
// from fully-fledged Target and other types.
|
||||||
type healthReporter interface {
|
type healthReporter interface {
|
||||||
|
// Report the last-known health state for this target.
|
||||||
State() TargetState
|
State() TargetState
|
||||||
}
|
}
|
||||||
|
|
||||||
type Target struct {
|
// A Target represents an endpoint that should be interrogated for metrics.
|
||||||
|
//
|
||||||
|
// The protocol described by this type will likely change in future iterations,
|
||||||
|
// as it offers no good support for aggregated targets and fan out. Thusly,
|
||||||
|
// it is likely that the current Target and target uses will be
|
||||||
|
// wrapped with some resolver type.
|
||||||
|
//
|
||||||
|
// For the future, the Target protocol will abstract away the exact means that
|
||||||
|
// metrics are retrieved and deserialized from the given instance to which it
|
||||||
|
// refers.
|
||||||
|
type Target interface {
|
||||||
|
// Retrieve values from this target.
|
||||||
|
//
|
||||||
|
// earliest refers to the soonest available opportunity to reschedule the
|
||||||
|
// target for a future retrieval. It is up to the underlying scheduler type,
|
||||||
|
// alluded to in the scheduledFor function, to use this as it wants to. The
|
||||||
|
// current use case is to create a common batching time for scraping multiple
|
||||||
|
// Targets in the future through the TargetPool.
|
||||||
|
Scrape(earliest time.Time, results chan Result) error
|
||||||
|
// Fulfill the healthReporter interface.
|
||||||
|
State() TargetState
|
||||||
|
// Report the soonest time at which this Target may be scheduled for
|
||||||
|
// retrieval. This value needn't convey that the operation occurs at this
|
||||||
|
// time, but it should occur no sooner than it.
|
||||||
|
//
|
||||||
|
// Right now, this is used as the sorting key in TargetPool.
|
||||||
|
scheduledFor() time.Time
|
||||||
|
// The address to which the Target corresponds. Out of all of the available
|
||||||
|
// points in this interface, this one is the best candidate to change given
|
||||||
|
// the ways to express the endpoint.
|
||||||
|
Address() string
|
||||||
|
// How frequently queries occur.
|
||||||
|
Interval() time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
|
||||||
|
type target struct {
|
||||||
|
// scheduler provides the scheduling strategy that is used to formulate what
|
||||||
|
// is returned in Target.scheduledFor.
|
||||||
scheduler scheduler
|
scheduler scheduler
|
||||||
state TargetState
|
state TargetState
|
||||||
|
|
||||||
Address string
|
address string
|
||||||
Deadline time.Duration
|
// What is the deadline for the HTTP or HTTPS against this endpoint.
|
||||||
|
Deadline time.Duration
|
||||||
|
// Any base labels that are added to this target and its metrics.
|
||||||
BaseLabels model.LabelSet
|
BaseLabels model.LabelSet
|
||||||
|
|
||||||
// XXX: Move this to a field with the target manager initialization instead of here.
|
// XXX: Move this to a field with the target manager initialization instead of here.
|
||||||
Interval time.Duration
|
interval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) *Target {
|
// Furnish a reasonably configured target for querying.
|
||||||
target := &Target{
|
func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) Target {
|
||||||
Address: address,
|
target := &target{
|
||||||
|
address: address,
|
||||||
Deadline: deadline,
|
Deadline: deadline,
|
||||||
Interval: interval,
|
interval: interval,
|
||||||
BaseLabels: baseLabels,
|
BaseLabels: baseLabels,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,7 +121,7 @@ type Result struct {
|
||||||
Target Target
|
Target Target
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) {
|
func (t *target) Scrape(earliest time.Time, results chan Result) (err error) {
|
||||||
result := Result{}
|
result := Result{}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -92,7 +144,7 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) {
|
||||||
|
|
||||||
request := func() {
|
request := func() {
|
||||||
ti := time.Now()
|
ti := time.Now()
|
||||||
resp, err := http.Get(t.Address)
|
resp, err := http.Get(t.Address())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -110,7 +162,7 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
baseLabels := map[string]string{"instance": t.Address}
|
baseLabels := map[string]string{"instance": t.Address()}
|
||||||
|
|
||||||
for name, v := range intermediate {
|
for name, v := range intermediate {
|
||||||
asMap, ok := v.(map[string]interface{})
|
asMap, ok := v.(map[string]interface{})
|
||||||
|
@ -199,10 +251,18 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t Target) State() TargetState {
|
func (t target) State() TargetState {
|
||||||
return t.state
|
return t.state
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t Target) scheduledFor() time.Time {
|
func (t target) scheduledFor() time.Time {
|
||||||
return t.scheduler.ScheduledFor()
|
return t.scheduler.ScheduledFor()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t target) Address() string {
|
||||||
|
return t.address
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t target) Interval() time.Duration {
|
||||||
|
return t.interval
|
||||||
|
}
|
||||||
|
|
|
@ -24,8 +24,8 @@ import (
|
||||||
type TargetManager interface {
|
type TargetManager interface {
|
||||||
acquire()
|
acquire()
|
||||||
release()
|
release()
|
||||||
Add(t *Target)
|
Add(t Target)
|
||||||
Remove(t *Target)
|
Remove(t Target)
|
||||||
AddTargetsFromConfig(config *config.Config)
|
AddTargetsFromConfig(config *config.Config)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,20 +51,20 @@ func (m targetManager) release() {
|
||||||
<-m.requestAllowance
|
<-m.requestAllowance
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m targetManager) Add(t *Target) {
|
func (m targetManager) Add(t Target) {
|
||||||
targetPool, ok := m.pools[t.Interval]
|
targetPool, ok := m.pools[t.Interval()]
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
targetPool.manager = m
|
targetPool.manager = m
|
||||||
log.Printf("Pool %s does not exist; creating and starting...", t.Interval)
|
log.Printf("Pool %s does not exist; creating and starting...", t.Interval())
|
||||||
go targetPool.Run(m.results, t.Interval)
|
go targetPool.Run(m.results, t.Interval())
|
||||||
}
|
}
|
||||||
|
|
||||||
heap.Push(&targetPool, t)
|
heap.Push(&targetPool, t)
|
||||||
m.pools[t.Interval] = targetPool
|
m.pools[t.Interval()] = targetPool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m targetManager) Remove(t *Target) {
|
func (m targetManager) Remove(t Target) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
|
|
||||||
type TargetPool struct {
|
type TargetPool struct {
|
||||||
done chan bool
|
done chan bool
|
||||||
targets []*Target
|
targets []*target
|
||||||
manager TargetManager
|
manager TargetManager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ func (p *TargetPool) Pop() interface{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *TargetPool) Push(element interface{}) {
|
func (p *TargetPool) Push(element interface{}) {
|
||||||
p.targets = append(p.targets, element.(*Target))
|
p.targets = append(p.targets, element.(*target))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p TargetPool) Swap(i, j int) {
|
func (p TargetPool) Swap(i, j int) {
|
||||||
|
@ -62,7 +62,7 @@ func (p TargetPool) Stop() {
|
||||||
p.done <- true
|
p.done <- true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *Target) {
|
func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *target) {
|
||||||
p.manager.acquire()
|
p.manager.acquire()
|
||||||
defer p.manager.release()
|
defer p.manager.release()
|
||||||
|
|
||||||
|
@ -71,7 +71,7 @@ func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *Targe
|
||||||
|
|
||||||
func (p *TargetPool) runIteration(results chan Result) {
|
func (p *TargetPool) runIteration(results chan Result) {
|
||||||
for i := 0; i < p.Len(); i++ {
|
for i := 0; i < p.Len(); i++ {
|
||||||
target := heap.Pop(p).(*Target)
|
target := heap.Pop(p).(*target)
|
||||||
if target == nil {
|
if target == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
|
"github.com/matttproud/prometheus/utility/test"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -28,7 +29,7 @@ func (s literalScheduler) ScheduledFor() time.Time {
|
||||||
func (s literalScheduler) Reschedule(earliest time.Time, future TargetState) {
|
func (s literalScheduler) Reschedule(earliest time.Time, future TargetState) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTargetPool(t *testing.T) {
|
func testTargetPool(t test.Tester) {
|
||||||
type expectation struct {
|
type expectation struct {
|
||||||
size int
|
size int
|
||||||
}
|
}
|
||||||
|
@ -113,25 +114,54 @@ func TestTargetPool(t *testing.T) {
|
||||||
pool := TargetPool{}
|
pool := TargetPool{}
|
||||||
|
|
||||||
for _, input := range scenario.inputs {
|
for _, input := range scenario.inputs {
|
||||||
target := Target{
|
target := target{
|
||||||
Address: input.address,
|
address: input.address,
|
||||||
scheduler: literalScheduler(input.scheduledFor),
|
scheduler: literalScheduler(input.scheduledFor),
|
||||||
}
|
}
|
||||||
|
|
||||||
heap.Push(&pool, &target)
|
heap.Push(&pool, &target)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
targets := []Target{}
|
||||||
|
|
||||||
if pool.Len() != len(scenario.outputs) {
|
if pool.Len() != len(scenario.outputs) {
|
||||||
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len())
|
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len())
|
||||||
} else {
|
} else {
|
||||||
for j, output := range scenario.outputs {
|
for j, output := range scenario.outputs {
|
||||||
target := heap.Pop(&pool).(*Target)
|
target := heap.Pop(&pool).(Target)
|
||||||
|
|
||||||
if target.Address != output.address {
|
if target.Address() != output.address {
|
||||||
t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address)
|
t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
targets = append(targets, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pool.Len() != 0 {
|
||||||
|
t.Errorf("%s %d. expected pool to be empty, had %d", scenario.name, i, pool.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(targets) != len(scenario.outputs) {
|
||||||
|
t.Errorf("%s %d. expected to receive %d elements, got %d", scenario.name, i, len(scenario.outputs), len(targets))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, target := range targets {
|
||||||
|
heap.Push(&pool, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pool.Len() != len(scenario.outputs) {
|
||||||
|
t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), pool.Len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTargetPool(t *testing.T) {
|
||||||
|
testTargetPool(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkTargetPool(b *testing.B) {
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
testTargetPool(b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue