Merge pull request #28 from matttproud/fix/validate/retrieval-queue-behavior

Refactor target scheduling to separate facility.
This commit is contained in:
Matt T. Proud 2013-01-13 01:45:51 -08:00
commit af5b376459
16 changed files with 483 additions and 209 deletions

View file

@ -6,6 +6,7 @@ import (
"github.com/matttproud/prometheus/rules/ast"
"time"
)
func (serv MetricsService) Query(Expr string, Json string, Start string, End string) (result string) {
exprNode, err := rules.LoadExprFromString(Expr)
if err != nil {

View file

@ -16,8 +16,8 @@ package main
import (
"code.google.com/p/gorest"
"fmt"
"github.com/matttproud/prometheus/api"
"github.com/matttproud/golang_instrumentation"
"github.com/matttproud/prometheus/api"
"github.com/matttproud/prometheus/config"
"github.com/matttproud/prometheus/retrieval"
"github.com/matttproud/prometheus/rules"

View file

@ -0,0 +1,23 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"testing"
)
func TestInterfaces(t *testing.T) {
var _ scheduler = &healthScheduler{}
var _ healthReporter = Target{}
}

143
retrieval/scheduler.go Normal file
View file

@ -0,0 +1,143 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"math"
"time"
)
const (
// The default increment for exponential backoff when querying a target.
DEFAULT_BACKOFF_VALUE = 2
// The base units for the exponential backoff.
DEFAULT_BACKOFF_VALUE_UNIT = time.Second
// The maximum allowed backoff time.
MAXIMUM_BACKOFF_VALUE = 30 * time.Minute
)
// A basic interface only useful in testing contexts for dispensing the time
// in a controlled manner.
type instantProvider interface {
// The current instant.
Now() time.Time
}
// timer is a simple means for fluently wrapping around standard Go timekeeping
// mechanisms to enhance testability without compromising code readability.
//
// A timer is sufficient for use on bare initialization. A provider should be
// set only for test contexts. When not provided, a timer emits the current
// system time.
type timer struct {
// The underlying means through which time is provided, if supplied.
provider instantProvider
}
// Emit the current instant.
func (t timer) Now() time.Time {
if t.provider == nil {
return time.Now()
}
return t.provider.Now()
}
// scheduler is an interface that various scheduling strategies must fulfill
// in order to set the scheduling order for a target.
//
// Target takes advantage of this type by embedding an instance of scheduler
// in each Target instance itself. The emitted scheduler.ScheduledFor() is
// the basis for sorting the order of pending queries.
//
// This type is described as an interface to maximize testability.
type scheduler interface {
// ScheduledFor emits the earliest time at which the given object is allowed
// to be run. This time may or not be a reflection of the earliest parameter
// provided in Reschedule; that is up to the underlying strategy
// implementations.
ScheduledFor() time.Time
// Instruct the scheduled item to re-schedule itself given new state data and
// the earliest time at which the outside system thinks the operation should
// be scheduled for.
Reschedule(earliest time.Time, future TargetState)
}
// healthScheduler is an implementation of scheduler that uses health data
// provided by the target field as well as unreachability counts to determine
// when to next schedule an operation.
//
// The type is almost capable of being used with default initialization, except
// that a target field must be provided for which the system compares current
// health against future proposed values.
type healthScheduler struct {
scheduledFor time.Time
target healthReporter
timer timer
unreachableCount int
}
func (s healthScheduler) ScheduledFor() time.Time {
return s.scheduledFor
}
// Reschedule, like the protocol described in scheduler, uses the current and
// proposed future health state to determine how and when a given subject is to
// be scheduled.
//
// If a subject has been at given moment marked as unhealthy, an exponential
// backoff scheme is applied to it. The reason for this backoff is to ensure
// that known-healthy targets can consume valuable request queuing resources
// first. Depending on the retrieval interval and number of consecutive
// unhealthy markings, the query of these unhealthy individuals may come before
// the healthy ones for a short time to help ensure expeditious retrieval.
// The inflection point that drops these to the back of the queue is beneficial
// to save resources in the long-run.
//
// If a subject is healthy, its next scheduling opportunity is set to
// earliest, for this ensures fair querying of all remaining healthy targets and
// removes bias in the ordering. In order for the anti-bias value to have any
// value, the earliest opportunity should be set to a value that is constant
// for a given batch of subjects who are to be scraped on a given interval.
func (s *healthScheduler) Reschedule(e time.Time, f TargetState) {
currentState := s.target.State()
// XXX: Handle metrics surrounding health.
switch currentState {
case UNKNOWN, UNREACHABLE:
switch f {
case ALIVE:
s.unreachableCount = 0
break
case UNREACHABLE:
s.unreachableCount++
break
}
case ALIVE:
switch f {
case UNREACHABLE:
s.unreachableCount++
}
}
if s.unreachableCount == 0 {
s.scheduledFor = e
} else {
backoff := MAXIMUM_BACKOFF_VALUE
exponential := time.Duration(math.Pow(DEFAULT_BACKOFF_VALUE, float64(s.unreachableCount))) * DEFAULT_BACKOFF_VALUE_UNIT
if backoff > exponential {
backoff = exponential
}
s.scheduledFor = s.timer.Now().Add(backoff)
}
}

111
retrieval/scheduler_test.go Normal file
View file

@ -0,0 +1,111 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"testing"
"time"
)
type fakeHealthReporter struct {
index int
stateQueue []TargetState
}
func (h fakeHealthReporter) State() (state TargetState) {
state = h.stateQueue[h.index]
h.index++
return
}
type fakeTimeProvider struct {
index int
timeQueue []time.Time
}
func (t *fakeTimeProvider) Now() (time time.Time) {
time = t.timeQueue[t.index]
t.index++
return
}
func TestHealthScheduler(t *testing.T) {
now := time.Now()
var scenarios = []struct {
futureHealthState []TargetState
preloadedTimes []time.Time
expectedSchedule []time.Time
}{
// The behavior discussed in healthScheduler.Reschedule should be read
// fully to understand the whys and wherefores.
{
futureHealthState: []TargetState{UNKNOWN, ALIVE, ALIVE},
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
expectedSchedule: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
},
{
futureHealthState: []TargetState{UNKNOWN, UNREACHABLE, UNREACHABLE},
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
expectedSchedule: []time.Time{now, now.Add(time.Second * 2), now.Add(time.Minute).Add(time.Second * 4)},
},
{
futureHealthState: []TargetState{UNKNOWN, UNREACHABLE, ALIVE},
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
expectedSchedule: []time.Time{now, now.Add(time.Second * 2), now.Add(time.Minute * 2)},
},
{
futureHealthState: []TargetState{UNKNOWN, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE},
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2), now.Add(time.Minute * 3), now.Add(time.Minute * 4), now.Add(time.Minute * 5), now.Add(time.Minute * 6), now.Add(time.Minute * 7), now.Add(time.Minute * 8), now.Add(time.Minute * 9), now.Add(time.Minute * 10), now.Add(time.Minute * 11), now.Add(time.Minute * 12)},
expectedSchedule: []time.Time{now, now.Add(time.Second * 2), now.Add(time.Minute * 1).Add(time.Second * 4), now.Add(time.Minute * 2).Add(time.Second * 8), now.Add(time.Minute * 3).Add(time.Second * 16), now.Add(time.Minute * 4).Add(time.Second * 32), now.Add(time.Minute * 5).Add(time.Second * 64), now.Add(time.Minute * 6).Add(time.Second * 128), now.Add(time.Minute * 7).Add(time.Second * 256), now.Add(time.Minute * 8).Add(time.Second * 512), now.Add(time.Minute * 9).Add(time.Second * 1024), now.Add(time.Minute * 10).Add(time.Minute * 30), now.Add(time.Minute * 11).Add(time.Minute * 30)},
},
}
for i, scenario := range scenarios {
provider := &fakeTimeProvider{}
for _, time := range scenario.preloadedTimes {
provider.timeQueue = append(provider.timeQueue, time)
}
reporter := fakeHealthReporter{}
for _, state := range scenario.futureHealthState {
reporter.stateQueue = append(reporter.stateQueue, state)
}
if len(scenario.preloadedTimes) != len(scenario.futureHealthState) || len(scenario.futureHealthState) != len(scenario.expectedSchedule) {
t.Fatalf("%d. times and health reports and next time lengths were not equal.", i)
}
timer := timer{
provider: provider,
}
scheduler := healthScheduler{
timer: timer,
target: reporter,
scheduledFor: now,
}
for j := 0; j < len(scenario.preloadedTimes); j++ {
futureState := scenario.futureHealthState[j]
scheduler.Reschedule(scenario.preloadedTimes[j], futureState)
nextSchedule := scheduler.ScheduledFor()
if nextSchedule != scenario.expectedSchedule[j] {
t.Errorf("%d.%d. Expected to be scheduled to %s, got %s", i, j, scenario.expectedSchedule[j], nextSchedule)
}
}
}
}

View file

@ -18,8 +18,6 @@ import (
"github.com/matttproud/golang_instrumentation/metrics"
"github.com/matttproud/prometheus/model"
"io/ioutil"
"log"
"math"
"net/http"
"strconv"
"time"
@ -33,13 +31,12 @@ const (
UNREACHABLE
)
const (
MAXIMUM_BACKOFF = time.Minute * 30
)
type healthReporter interface {
State() TargetState
}
type Target struct {
scheduledFor time.Time
unreachableCount int
scheduler scheduler
state TargetState
Address string
@ -50,50 +47,29 @@ type Target struct {
Interval time.Duration
}
func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) *Target {
target := &Target{
Address: address,
Deadline: deadline,
Interval: interval,
BaseLabels: baseLabels,
}
scheduler := &healthScheduler{
target: target,
}
target.scheduler = scheduler
return target
}
type Result struct {
Err error
Samples []model.Sample
Target Target
}
func (t *Target) reschedule(s TargetState) {
currentState := t.state
switch currentState {
case UNKNOWN, UNREACHABLE:
switch s {
case ALIVE:
t.unreachableCount = 0
targetsHealthy.Increment()
case UNREACHABLE:
backoff := MAXIMUM_BACKOFF
exponential := time.Duration(math.Pow(2, float64(t.unreachableCount))) * time.Second
if backoff > exponential {
backoff = exponential
}
t.scheduledFor = time.Now().Add(backoff)
t.unreachableCount++
log.Printf("%s unavailable %s times deferred for %s.", t, t.unreachableCount, backoff)
default:
}
case ALIVE:
switch s {
case UNREACHABLE:
t.unreachableCount++
targetsUnhealthy.Increment()
}
default:
}
if s != currentState {
log.Printf("%s transitioning from %s to %s.", t, currentState, s)
}
t.state = s
}
func (t *Target) Scrape(results chan Result) (err error) {
func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) {
result := Result{}
defer func() {
@ -106,7 +82,7 @@ func (t *Target) Scrape(results chan Result) (err error) {
futureState = UNREACHABLE
}
t.reschedule(futureState)
t.scheduler.Reschedule(earliest, futureState)
result.Err = err
results <- result
@ -222,3 +198,11 @@ func (t *Target) Scrape(results chan Result) (err error) {
return
}
func (t Target) State() TargetState {
return t.state
}
func (t Target) scheduledFor() time.Time {
return t.scheduler.ScheduledFor()
}

View file

@ -84,12 +84,7 @@ func (m targetManager) AddTargetsFromConfig(config *config.Config) {
}
for _, endpoint := range configTargets.Endpoints {
target := &Target{
Address: endpoint,
BaseLabels: baseLabels,
Deadline: time.Second * 5,
Interval: interval,
}
target := NewTarget(endpoint, time.Second*5, interval, baseLabels)
m.Add(target)
}
}

View file

@ -23,7 +23,7 @@ func (p TargetPool) Len() int {
}
func (p TargetPool) Less(i, j int) bool {
return p.targets[i].scheduledFor.Before(p.targets[j].scheduledFor)
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
}
func (p *TargetPool) Pop() interface{} {
@ -62,11 +62,11 @@ func (p TargetPool) Stop() {
p.done <- true
}
func (p *TargetPool) runSingle(results chan Result, t *Target) {
func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *Target) {
p.manager.acquire()
defer p.manager.release()
t.Scrape(results)
t.Scrape(earliest, results)
}
func (p *TargetPool) runIteration(results chan Result) {
@ -78,14 +78,14 @@ func (p *TargetPool) runIteration(results chan Result) {
now := time.Now()
if target.scheduledFor.After(now) {
if target.scheduledFor().After(now) {
heap.Push(p, target)
break
}
go func() {
p.runSingle(results, target)
p.runSingle(now, results, target)
heap.Push(p, target)
}()
}

View file

@ -19,6 +19,15 @@ import (
"time"
)
type literalScheduler time.Time
func (s literalScheduler) ScheduledFor() time.Time {
return time.Time(s)
}
func (s literalScheduler) Reschedule(earliest time.Time, future TargetState) {
}
func TestTargetPool(t *testing.T) {
type expectation struct {
size int
@ -106,7 +115,7 @@ func TestTargetPool(t *testing.T) {
for _, input := range scenario.inputs {
target := Target{
Address: input.address,
scheduledFor: input.scheduledFor,
scheduler: literalScheduler(input.scheduledFor),
}
heap.Push(&pool, &target)

View file

@ -1,8 +1,8 @@
package ast
import (
"fmt"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
@ -164,26 +164,34 @@ func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string {
case SCALAR:
scalar := node.(ScalarNode).Eval(timestamp)
switch format {
case TEXT: return fmt.Sprintf("scalar: %v", scalar)
case JSON: return typedValueToJSON(scalar, "scalar")
case TEXT:
return fmt.Sprintf("scalar: %v", scalar)
case JSON:
return typedValueToJSON(scalar, "scalar")
}
case VECTOR:
vector := node.(VectorNode).Eval(timestamp)
switch format {
case TEXT: return vector.ToString()
case JSON: return typedValueToJSON(vector, "vector")
case TEXT:
return vector.ToString()
case JSON:
return typedValueToJSON(vector, "vector")
}
case MATRIX:
matrix := node.(MatrixNode).Eval(timestamp)
switch format {
case TEXT: return matrix.ToString()
case JSON: return typedValueToJSON(matrix, "matrix")
case TEXT:
return matrix.ToString()
case JSON:
return typedValueToJSON(matrix, "matrix")
}
case STRING:
str := node.(StringNode).Eval(timestamp)
switch format {
case TEXT: return str
case JSON: return typedValueToJSON(str, "string")
case TEXT:
return str
case JSON:
return typedValueToJSON(str, "string")
}
}
panic("Switch didn't cover all node types")