mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Refactor target scheduling to separate facility.
``Target`` will be refactored down the road to support various nuanced endpoint types. Thusly incorporating the scheduling behavior within it will be problematic. To that end, the scheduling behavior has been moved into a separate assistance type to improve conciseness and testability. ``make format`` was also run.
This commit is contained in:
parent
cb6eb30182
commit
efe61c18fa
|
@ -1,11 +1,11 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"code.google.com/p/gorest"
|
||||
"code.google.com/p/gorest"
|
||||
)
|
||||
|
||||
type MetricsService struct {
|
||||
gorest.RestService `root:"/api/" consumes:"application/json" produces:"application/json"`
|
||||
gorest.RestService `root:"/api/" consumes:"application/json" produces:"application/json"`
|
||||
|
||||
query gorest.EndPoint `method:"GET" path:"/query?{expr:string}&{json:string}&{start:string}&{end:string}" output:"string"`
|
||||
query gorest.EndPoint `method:"GET" path:"/query?{expr:string}&{json:string}&{start:string}&{end:string}" output:"string"`
|
||||
}
|
||||
|
|
35
api/query.go
35
api/query.go
|
@ -1,28 +1,29 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"code.google.com/p/gorest"
|
||||
"code.google.com/p/gorest"
|
||||
"github.com/matttproud/prometheus/rules"
|
||||
"github.com/matttproud/prometheus/rules/ast"
|
||||
"time"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (serv MetricsService) Query(Expr string, Json string, Start string, End string) (result string) {
|
||||
exprNode, err := rules.LoadExprFromString(Expr)
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
exprNode, err := rules.LoadExprFromString(Expr)
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
|
||||
timestamp := time.Now()
|
||||
timestamp := time.Now()
|
||||
|
||||
rb := serv.ResponseBuilder()
|
||||
var format ast.OutputFormat
|
||||
if Json != "" {
|
||||
format = ast.JSON
|
||||
rb.SetContentType(gorest.Application_Json)
|
||||
} else {
|
||||
format = ast.TEXT
|
||||
rb.SetContentType(gorest.Text_Plain)
|
||||
}
|
||||
rb := serv.ResponseBuilder()
|
||||
var format ast.OutputFormat
|
||||
if Json != "" {
|
||||
format = ast.JSON
|
||||
rb.SetContentType(gorest.Application_Json)
|
||||
} else {
|
||||
format = ast.TEXT
|
||||
rb.SetContentType(gorest.Text_Plain)
|
||||
}
|
||||
|
||||
return ast.EvalToString(exprNode, ×tamp, format)
|
||||
return ast.EvalToString(exprNode, ×tamp, format)
|
||||
}
|
||||
|
|
|
@ -35,8 +35,8 @@ func LoadFromReader(configReader io.Reader) (*Config, error) {
|
|||
yyin = configReader
|
||||
yypos = 1
|
||||
yyline = 1
|
||||
yydata = ""
|
||||
yytext = ""
|
||||
yydata = ""
|
||||
yytext = ""
|
||||
|
||||
lexer := &ConfigLexer{}
|
||||
yyParse(lexer)
|
||||
|
@ -56,7 +56,7 @@ func LoadFromString(configString string) (*Config, error) {
|
|||
|
||||
func LoadFromFile(fileName string) (*Config, error) {
|
||||
configReader, err := os.Open(fileName)
|
||||
defer configReader.Close()
|
||||
defer configReader.Close()
|
||||
if err != nil {
|
||||
return &Config{}, err
|
||||
}
|
||||
|
|
10
main.go
10
main.go
|
@ -14,10 +14,10 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"code.google.com/p/gorest"
|
||||
"code.google.com/p/gorest"
|
||||
"fmt"
|
||||
"github.com/matttproud/prometheus/api"
|
||||
"github.com/matttproud/golang_instrumentation"
|
||||
"github.com/matttproud/prometheus/api"
|
||||
"github.com/matttproud/prometheus/config"
|
||||
"github.com/matttproud/prometheus/retrieval"
|
||||
"github.com/matttproud/prometheus/rules"
|
||||
|
@ -68,12 +68,12 @@ func main() {
|
|||
}
|
||||
|
||||
go func() {
|
||||
gorest.RegisterService(new(api.MetricsService))
|
||||
gorest.RegisterService(new(api.MetricsService))
|
||||
exporter := registry.DefaultRegistry.YieldExporter()
|
||||
|
||||
http.Handle("/", gorest.Handle())
|
||||
http.Handle("/", gorest.Handle())
|
||||
http.Handle("/metrics.json", exporter)
|
||||
http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static"))))
|
||||
http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static"))))
|
||||
http.ListenAndServe(":9090", nil)
|
||||
}()
|
||||
|
||||
|
|
23
retrieval/interface_test.go
Normal file
23
retrieval/interface_test.go
Normal file
|
@ -0,0 +1,23 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package retrieval
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestInterfaces(t *testing.T) {
|
||||
var _ scheduler = &healthScheduler{}
|
||||
var _ healthReporter = Target{}
|
||||
}
|
143
retrieval/scheduler.go
Normal file
143
retrieval/scheduler.go
Normal file
|
@ -0,0 +1,143 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package retrieval
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// The default increment for exponential backoff when querying a target.
|
||||
DEFAULT_BACKOFF_VALUE = 2
|
||||
// The base units for the exponential backoff.
|
||||
DEFAULT_BACKOFF_VALUE_UNIT = time.Second
|
||||
// The maximum allowed backoff time.
|
||||
MAXIMUM_BACKOFF_VALUE = 30 * time.Minute
|
||||
)
|
||||
|
||||
// A basic interface only useful in testing contexts for dispensing the time
|
||||
// in a controlled manner.
|
||||
type instantProvider interface {
|
||||
// The current instant.
|
||||
Now() time.Time
|
||||
}
|
||||
|
||||
// timer is a simple means for fluently wrapping around standard Go timekeeping
|
||||
// mechanisms to enhance testability without compromising code readability.
|
||||
//
|
||||
// A timer is sufficient for use on bare initialization. A provider should be
|
||||
// set only for test contexts. When not provided, a timer emits the current
|
||||
// system time.
|
||||
type timer struct {
|
||||
// The underlying means through which time is provided, if supplied.
|
||||
provider instantProvider
|
||||
}
|
||||
|
||||
// Emit the current instant.
|
||||
func (t timer) Now() time.Time {
|
||||
if t.provider == nil {
|
||||
return time.Now()
|
||||
}
|
||||
return t.provider.Now()
|
||||
}
|
||||
|
||||
// scheduler is an interface that various scheduling strategies must fulfill
|
||||
// in order to set the scheduling order for a target.
|
||||
//
|
||||
// Target takes advantage of this type by embedding an instance of scheduler
|
||||
// in each Target instance itself. The emitted scheduler.ScheduledFor() is
|
||||
// the basis for sorting the order of pending queries.
|
||||
//
|
||||
// This type is described as an interface to maximize testability.
|
||||
type scheduler interface {
|
||||
// ScheduledFor emits the earliest time at which the given object is allowed
|
||||
// to be run. This time may or not be a reflection of the earliest parameter
|
||||
// provided in Reschedule; that is up to the underlying strategy
|
||||
// implementations.
|
||||
ScheduledFor() time.Time
|
||||
// Instruct the scheduled item to re-schedule itself given new state data and
|
||||
// the earliest time at which the outside system thinks the operation should
|
||||
// be scheduled for.
|
||||
Reschedule(earliest time.Time, future TargetState)
|
||||
}
|
||||
|
||||
// healthScheduler is an implementation of scheduler that uses health data
|
||||
// provided by the target field as well as unreachability counts to determine
|
||||
// when to next schedule an operation.
|
||||
//
|
||||
// The type is almost capable of being used with default initialization, except
|
||||
// that a target field must be provided for which the system compares current
|
||||
// health against future proposed values.
|
||||
type healthScheduler struct {
|
||||
scheduledFor time.Time
|
||||
target healthReporter
|
||||
timer timer
|
||||
unreachableCount int
|
||||
}
|
||||
|
||||
func (s healthScheduler) ScheduledFor() time.Time {
|
||||
return s.scheduledFor
|
||||
}
|
||||
|
||||
// Reschedule, like the protocol described in scheduler, uses the current and
|
||||
// proposed future health state to determine how and when a given subject is to
|
||||
// be scheduled.
|
||||
//
|
||||
// If a subject has been at given moment marked as unhealthy, an exponential
|
||||
// backoff scheme is applied to it. The reason for this backoff is to ensure
|
||||
// that known-healthy targets can consume valuable request queuing resources
|
||||
// first. Depending on the retrieval interval and number of consecutive
|
||||
// unhealthy markings, the query of these unhealthy individuals may come before
|
||||
// the healthy ones for a short time to help ensure expeditious retrieval.
|
||||
// The inflection point that drops these to the back of the queue is beneficial
|
||||
// to save resources in the long-run.
|
||||
//
|
||||
// If a subject is healthy, its next scheduling opportunity is set to
|
||||
// earliest, for this ensures fair querying of all remaining healthy targets and
|
||||
// removes bias in the ordering. In order for the anti-bias value to have any
|
||||
// value, the earliest opportunity should be set to a value that is constant
|
||||
// for a given batch of subjects who are to be scraped on a given interval.
|
||||
func (s *healthScheduler) Reschedule(e time.Time, f TargetState) {
|
||||
currentState := s.target.State()
|
||||
// XXX: Handle metrics surrounding health.
|
||||
switch currentState {
|
||||
case UNKNOWN, UNREACHABLE:
|
||||
switch f {
|
||||
case ALIVE:
|
||||
s.unreachableCount = 0
|
||||
break
|
||||
case UNREACHABLE:
|
||||
s.unreachableCount++
|
||||
break
|
||||
}
|
||||
case ALIVE:
|
||||
switch f {
|
||||
case UNREACHABLE:
|
||||
s.unreachableCount++
|
||||
}
|
||||
}
|
||||
|
||||
if s.unreachableCount == 0 {
|
||||
s.scheduledFor = e
|
||||
} else {
|
||||
backoff := MAXIMUM_BACKOFF_VALUE
|
||||
exponential := time.Duration(math.Pow(DEFAULT_BACKOFF_VALUE, float64(s.unreachableCount))) * DEFAULT_BACKOFF_VALUE_UNIT
|
||||
if backoff > exponential {
|
||||
backoff = exponential
|
||||
}
|
||||
|
||||
s.scheduledFor = s.timer.Now().Add(backoff)
|
||||
}
|
||||
}
|
111
retrieval/scheduler_test.go
Normal file
111
retrieval/scheduler_test.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package retrieval
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type fakeHealthReporter struct {
|
||||
index int
|
||||
stateQueue []TargetState
|
||||
}
|
||||
|
||||
func (h fakeHealthReporter) State() (state TargetState) {
|
||||
state = h.stateQueue[h.index]
|
||||
|
||||
h.index++
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
type fakeTimeProvider struct {
|
||||
index int
|
||||
timeQueue []time.Time
|
||||
}
|
||||
|
||||
func (t *fakeTimeProvider) Now() (time time.Time) {
|
||||
time = t.timeQueue[t.index]
|
||||
|
||||
t.index++
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func TestHealthScheduler(t *testing.T) {
|
||||
now := time.Now()
|
||||
var scenarios = []struct {
|
||||
futureHealthState []TargetState
|
||||
preloadedTimes []time.Time
|
||||
expectedSchedule []time.Time
|
||||
}{
|
||||
// The behavior discussed in healthScheduler.Reschedule should be read
|
||||
// fully to understand the whys and wherefores.
|
||||
{
|
||||
futureHealthState: []TargetState{UNKNOWN, ALIVE, ALIVE},
|
||||
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
|
||||
expectedSchedule: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
|
||||
},
|
||||
{
|
||||
futureHealthState: []TargetState{UNKNOWN, UNREACHABLE, UNREACHABLE},
|
||||
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
|
||||
expectedSchedule: []time.Time{now, now.Add(time.Second * 2), now.Add(time.Minute).Add(time.Second * 4)},
|
||||
},
|
||||
{
|
||||
futureHealthState: []TargetState{UNKNOWN, UNREACHABLE, ALIVE},
|
||||
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
|
||||
expectedSchedule: []time.Time{now, now.Add(time.Second * 2), now.Add(time.Minute * 2)},
|
||||
},
|
||||
{
|
||||
futureHealthState: []TargetState{UNKNOWN, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE, UNREACHABLE},
|
||||
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2), now.Add(time.Minute * 3), now.Add(time.Minute * 4), now.Add(time.Minute * 5), now.Add(time.Minute * 6), now.Add(time.Minute * 7), now.Add(time.Minute * 8), now.Add(time.Minute * 9), now.Add(time.Minute * 10), now.Add(time.Minute * 11), now.Add(time.Minute * 12)},
|
||||
expectedSchedule: []time.Time{now, now.Add(time.Second * 2), now.Add(time.Minute * 1).Add(time.Second * 4), now.Add(time.Minute * 2).Add(time.Second * 8), now.Add(time.Minute * 3).Add(time.Second * 16), now.Add(time.Minute * 4).Add(time.Second * 32), now.Add(time.Minute * 5).Add(time.Second * 64), now.Add(time.Minute * 6).Add(time.Second * 128), now.Add(time.Minute * 7).Add(time.Second * 256), now.Add(time.Minute * 8).Add(time.Second * 512), now.Add(time.Minute * 9).Add(time.Second * 1024), now.Add(time.Minute * 10).Add(time.Minute * 30), now.Add(time.Minute * 11).Add(time.Minute * 30)},
|
||||
},
|
||||
}
|
||||
|
||||
for i, scenario := range scenarios {
|
||||
provider := &fakeTimeProvider{}
|
||||
for _, time := range scenario.preloadedTimes {
|
||||
provider.timeQueue = append(provider.timeQueue, time)
|
||||
}
|
||||
|
||||
reporter := fakeHealthReporter{}
|
||||
for _, state := range scenario.futureHealthState {
|
||||
reporter.stateQueue = append(reporter.stateQueue, state)
|
||||
}
|
||||
if len(scenario.preloadedTimes) != len(scenario.futureHealthState) || len(scenario.futureHealthState) != len(scenario.expectedSchedule) {
|
||||
t.Fatalf("%d. times and health reports and next time lengths were not equal.", i)
|
||||
}
|
||||
|
||||
timer := timer{
|
||||
provider: provider,
|
||||
}
|
||||
|
||||
scheduler := healthScheduler{
|
||||
timer: timer,
|
||||
target: reporter,
|
||||
scheduledFor: now,
|
||||
}
|
||||
|
||||
for j := 0; j < len(scenario.preloadedTimes); j++ {
|
||||
futureState := scenario.futureHealthState[j]
|
||||
scheduler.Reschedule(scenario.preloadedTimes[j], futureState)
|
||||
nextSchedule := scheduler.ScheduledFor()
|
||||
if nextSchedule != scenario.expectedSchedule[j] {
|
||||
t.Errorf("%d.%d. Expected to be scheduled to %s, got %s", i, j, scenario.expectedSchedule[j], nextSchedule)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,8 +18,6 @@ import (
|
|||
"github.com/matttproud/golang_instrumentation/metrics"
|
||||
"github.com/matttproud/prometheus/model"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
@ -33,14 +31,13 @@ const (
|
|||
UNREACHABLE
|
||||
)
|
||||
|
||||
const (
|
||||
MAXIMUM_BACKOFF = time.Minute * 30
|
||||
)
|
||||
type healthReporter interface {
|
||||
State() TargetState
|
||||
}
|
||||
|
||||
type Target struct {
|
||||
scheduledFor time.Time
|
||||
unreachableCount int
|
||||
state TargetState
|
||||
scheduler scheduler
|
||||
state TargetState
|
||||
|
||||
Address string
|
||||
Deadline time.Duration
|
||||
|
@ -50,50 +47,29 @@ type Target struct {
|
|||
Interval time.Duration
|
||||
}
|
||||
|
||||
func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) *Target {
|
||||
target := &Target{
|
||||
Address: address,
|
||||
Deadline: deadline,
|
||||
Interval: interval,
|
||||
BaseLabels: baseLabels,
|
||||
}
|
||||
|
||||
scheduler := &healthScheduler{
|
||||
target: target,
|
||||
}
|
||||
target.scheduler = scheduler
|
||||
|
||||
return target
|
||||
}
|
||||
|
||||
type Result struct {
|
||||
Err error
|
||||
Samples []model.Sample
|
||||
Target Target
|
||||
}
|
||||
|
||||
func (t *Target) reschedule(s TargetState) {
|
||||
currentState := t.state
|
||||
|
||||
switch currentState {
|
||||
case UNKNOWN, UNREACHABLE:
|
||||
switch s {
|
||||
case ALIVE:
|
||||
t.unreachableCount = 0
|
||||
targetsHealthy.Increment()
|
||||
case UNREACHABLE:
|
||||
backoff := MAXIMUM_BACKOFF
|
||||
exponential := time.Duration(math.Pow(2, float64(t.unreachableCount))) * time.Second
|
||||
if backoff > exponential {
|
||||
backoff = exponential
|
||||
}
|
||||
|
||||
t.scheduledFor = time.Now().Add(backoff)
|
||||
t.unreachableCount++
|
||||
log.Printf("%s unavailable %s times deferred for %s.", t, t.unreachableCount, backoff)
|
||||
default:
|
||||
}
|
||||
case ALIVE:
|
||||
switch s {
|
||||
case UNREACHABLE:
|
||||
t.unreachableCount++
|
||||
targetsUnhealthy.Increment()
|
||||
}
|
||||
default:
|
||||
}
|
||||
|
||||
if s != currentState {
|
||||
log.Printf("%s transitioning from %s to %s.", t, currentState, s)
|
||||
}
|
||||
|
||||
t.state = s
|
||||
}
|
||||
|
||||
func (t *Target) Scrape(results chan Result) (err error) {
|
||||
func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) {
|
||||
result := Result{}
|
||||
|
||||
defer func() {
|
||||
|
@ -106,7 +82,7 @@ func (t *Target) Scrape(results chan Result) (err error) {
|
|||
futureState = UNREACHABLE
|
||||
}
|
||||
|
||||
t.reschedule(futureState)
|
||||
t.scheduler.Reschedule(earliest, futureState)
|
||||
|
||||
result.Err = err
|
||||
results <- result
|
||||
|
@ -222,3 +198,11 @@ func (t *Target) Scrape(results chan Result) (err error) {
|
|||
|
||||
return
|
||||
}
|
||||
|
||||
func (t Target) State() TargetState {
|
||||
return t.state
|
||||
}
|
||||
|
||||
func (t Target) scheduledFor() time.Time {
|
||||
return t.scheduler.ScheduledFor()
|
||||
}
|
||||
|
|
|
@ -84,12 +84,7 @@ func (m targetManager) AddTargetsFromConfig(config *config.Config) {
|
|||
}
|
||||
|
||||
for _, endpoint := range configTargets.Endpoints {
|
||||
target := &Target{
|
||||
Address: endpoint,
|
||||
BaseLabels: baseLabels,
|
||||
Deadline: time.Second * 5,
|
||||
Interval: interval,
|
||||
}
|
||||
target := NewTarget(endpoint, time.Second*5, interval, baseLabels)
|
||||
m.Add(target)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ func (p TargetPool) Len() int {
|
|||
}
|
||||
|
||||
func (p TargetPool) Less(i, j int) bool {
|
||||
return p.targets[i].scheduledFor.Before(p.targets[j].scheduledFor)
|
||||
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
|
||||
}
|
||||
|
||||
func (p *TargetPool) Pop() interface{} {
|
||||
|
@ -62,11 +62,11 @@ func (p TargetPool) Stop() {
|
|||
p.done <- true
|
||||
}
|
||||
|
||||
func (p *TargetPool) runSingle(results chan Result, t *Target) {
|
||||
func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *Target) {
|
||||
p.manager.acquire()
|
||||
defer p.manager.release()
|
||||
|
||||
t.Scrape(results)
|
||||
t.Scrape(earliest, results)
|
||||
}
|
||||
|
||||
func (p *TargetPool) runIteration(results chan Result) {
|
||||
|
@ -78,14 +78,14 @@ func (p *TargetPool) runIteration(results chan Result) {
|
|||
|
||||
now := time.Now()
|
||||
|
||||
if target.scheduledFor.After(now) {
|
||||
if target.scheduledFor().After(now) {
|
||||
heap.Push(p, target)
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
go func() {
|
||||
p.runSingle(results, target)
|
||||
p.runSingle(now, results, target)
|
||||
heap.Push(p, target)
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -19,6 +19,15 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
type literalScheduler time.Time
|
||||
|
||||
func (s literalScheduler) ScheduledFor() time.Time {
|
||||
return time.Time(s)
|
||||
}
|
||||
|
||||
func (s literalScheduler) Reschedule(earliest time.Time, future TargetState) {
|
||||
}
|
||||
|
||||
func TestTargetPool(t *testing.T) {
|
||||
type expectation struct {
|
||||
size int
|
||||
|
@ -105,8 +114,8 @@ func TestTargetPool(t *testing.T) {
|
|||
|
||||
for _, input := range scenario.inputs {
|
||||
target := Target{
|
||||
Address: input.address,
|
||||
scheduledFor: input.scheduledFor,
|
||||
Address: input.address,
|
||||
scheduler: literalScheduler(input.scheduledFor),
|
||||
}
|
||||
|
||||
heap.Push(&pool, &target)
|
||||
|
|
|
@ -540,15 +540,15 @@ func NewFunctionCall(function *Function, args []Node) (Node, error) {
|
|||
|
||||
func nodesHaveTypes(nodes []Node, exprTypes []ExprType) bool {
|
||||
for _, node := range nodes {
|
||||
correctType := false
|
||||
correctType := false
|
||||
for _, exprType := range exprTypes {
|
||||
if node.Type() == exprType {
|
||||
correctType = true
|
||||
}
|
||||
}
|
||||
if !correctType {
|
||||
return false
|
||||
}
|
||||
if !correctType {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -37,13 +37,13 @@ func (p *PersistenceBridge) getMetricsWithLabels(labels model.LabelSet) ([]*mode
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metrics = append(metrics, metric)
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func (p *PersistenceBridge) GetValueAtTime(labels model.LabelSet, timestamp *time.Time, stalenessPolicy *metric.StalenessPolicy) ([]*model.Sample, error) {
|
||||
metrics, err := p.getMetricsWithLabels(labels)
|
||||
metrics, err := p.getMetricsWithLabels(labels)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -62,14 +62,14 @@ func (p *PersistenceBridge) GetValueAtTime(labels model.LabelSet, timestamp *tim
|
|||
}
|
||||
|
||||
func (p *PersistenceBridge) GetBoundaryValues(labels model.LabelSet, interval *model.Interval, stalenessPolicy *metric.StalenessPolicy) ([]*model.SampleSet, error) {
|
||||
metrics, err := p.getMetricsWithLabels(labels)
|
||||
metrics, err := p.getMetricsWithLabels(labels)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sampleSets := []*model.SampleSet{}
|
||||
for _, metric := range metrics {
|
||||
// TODO: change to GetBoundaryValues() once it has the right return type.
|
||||
// TODO: change to GetBoundaryValues() once it has the right return type.
|
||||
sampleSet, err := p.persistence.GetRangeValues(metric, interval, stalenessPolicy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -78,15 +78,15 @@ func (p *PersistenceBridge) GetBoundaryValues(labels model.LabelSet, interval *m
|
|||
continue
|
||||
}
|
||||
|
||||
// TODO remove when persistence return value is fixed.
|
||||
sampleSet.Metric = *metric
|
||||
// TODO remove when persistence return value is fixed.
|
||||
sampleSet.Metric = *metric
|
||||
sampleSets = append(sampleSets, sampleSet)
|
||||
}
|
||||
return sampleSets, nil
|
||||
}
|
||||
|
||||
func (p *PersistenceBridge) GetRangeValues(labels model.LabelSet, interval *model.Interval, stalenessPolicy *metric.StalenessPolicy) ([]*model.SampleSet, error) {
|
||||
metrics, err := p.getMetricsWithLabels(labels)
|
||||
metrics, err := p.getMetricsWithLabels(labels)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -101,8 +101,8 @@ func (p *PersistenceBridge) GetRangeValues(labels model.LabelSet, interval *mode
|
|||
continue
|
||||
}
|
||||
|
||||
// TODO remove when persistence return value is fixed.
|
||||
sampleSet.Metric = *metric
|
||||
// TODO remove when persistence return value is fixed.
|
||||
sampleSet.Metric = *metric
|
||||
sampleSets = append(sampleSets, sampleSet)
|
||||
}
|
||||
return sampleSets, nil
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package ast
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -11,8 +11,8 @@ import (
|
|||
type OutputFormat int
|
||||
|
||||
const (
|
||||
TEXT OutputFormat = iota
|
||||
JSON
|
||||
TEXT OutputFormat = iota
|
||||
JSON
|
||||
)
|
||||
|
||||
func binOpTypeToString(opType BinOpType) string {
|
||||
|
@ -113,11 +113,11 @@ func (matrix Matrix) ToString() string {
|
|||
}
|
||||
}
|
||||
sort.Strings(labelStrings)
|
||||
valueStrings := []string{}
|
||||
for _, value := range sampleSet.Values {
|
||||
valueStrings = append(valueStrings,
|
||||
fmt.Sprintf("\n%v @[%v]", value.Value, value.Timestamp))
|
||||
}
|
||||
valueStrings := []string{}
|
||||
for _, value := range sampleSet.Values {
|
||||
valueStrings = append(valueStrings,
|
||||
fmt.Sprintf("\n%v @[%v]", value.Value, value.Timestamp))
|
||||
}
|
||||
metricStrings = append(metricStrings,
|
||||
fmt.Sprintf("%v{%v} => %v",
|
||||
metricName,
|
||||
|
@ -129,64 +129,72 @@ func (matrix Matrix) ToString() string {
|
|||
}
|
||||
|
||||
func errorToJSON(err error) string {
|
||||
errorStruct := struct {
|
||||
Type string
|
||||
Error string
|
||||
}{
|
||||
Type: "error",
|
||||
Error: err.Error(),
|
||||
}
|
||||
errorStruct := struct {
|
||||
Type string
|
||||
Error string
|
||||
}{
|
||||
Type: "error",
|
||||
Error: err.Error(),
|
||||
}
|
||||
|
||||
errorJSON, err := json.MarshalIndent(errorStruct, "", "\t")
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return string(errorJSON)
|
||||
errorJSON, err := json.MarshalIndent(errorStruct, "", "\t")
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return string(errorJSON)
|
||||
}
|
||||
|
||||
func typedValueToJSON(data interface{}, typeStr string) string {
|
||||
dataStruct := struct {
|
||||
Type string
|
||||
Value interface{}
|
||||
}{
|
||||
Type: typeStr,
|
||||
Value: data,
|
||||
}
|
||||
dataJSON, err := json.MarshalIndent(dataStruct, "", "\t")
|
||||
if err != nil {
|
||||
return errorToJSON(err)
|
||||
}
|
||||
return string(dataJSON)
|
||||
dataStruct := struct {
|
||||
Type string
|
||||
Value interface{}
|
||||
}{
|
||||
Type: typeStr,
|
||||
Value: data,
|
||||
}
|
||||
dataJSON, err := json.MarshalIndent(dataStruct, "", "\t")
|
||||
if err != nil {
|
||||
return errorToJSON(err)
|
||||
}
|
||||
return string(dataJSON)
|
||||
}
|
||||
|
||||
func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string {
|
||||
switch node.Type() {
|
||||
case SCALAR:
|
||||
scalar := node.(ScalarNode).Eval(timestamp)
|
||||
switch format {
|
||||
case TEXT: return fmt.Sprintf("scalar: %v", scalar)
|
||||
case JSON: return typedValueToJSON(scalar, "scalar")
|
||||
}
|
||||
case VECTOR:
|
||||
vector := node.(VectorNode).Eval(timestamp)
|
||||
switch format {
|
||||
case TEXT: return vector.ToString()
|
||||
case JSON: return typedValueToJSON(vector, "vector")
|
||||
}
|
||||
case MATRIX:
|
||||
matrix := node.(MatrixNode).Eval(timestamp)
|
||||
switch format {
|
||||
case TEXT: return matrix.ToString()
|
||||
case JSON: return typedValueToJSON(matrix, "matrix")
|
||||
}
|
||||
case STRING:
|
||||
str := node.(StringNode).Eval(timestamp)
|
||||
switch format {
|
||||
case TEXT: return str
|
||||
case JSON: return typedValueToJSON(str, "string")
|
||||
}
|
||||
}
|
||||
panic("Switch didn't cover all node types")
|
||||
switch node.Type() {
|
||||
case SCALAR:
|
||||
scalar := node.(ScalarNode).Eval(timestamp)
|
||||
switch format {
|
||||
case TEXT:
|
||||
return fmt.Sprintf("scalar: %v", scalar)
|
||||
case JSON:
|
||||
return typedValueToJSON(scalar, "scalar")
|
||||
}
|
||||
case VECTOR:
|
||||
vector := node.(VectorNode).Eval(timestamp)
|
||||
switch format {
|
||||
case TEXT:
|
||||
return vector.ToString()
|
||||
case JSON:
|
||||
return typedValueToJSON(vector, "vector")
|
||||
}
|
||||
case MATRIX:
|
||||
matrix := node.(MatrixNode).Eval(timestamp)
|
||||
switch format {
|
||||
case TEXT:
|
||||
return matrix.ToString()
|
||||
case JSON:
|
||||
return typedValueToJSON(matrix, "matrix")
|
||||
}
|
||||
case STRING:
|
||||
str := node.(StringNode).Eval(timestamp)
|
||||
switch format {
|
||||
case TEXT:
|
||||
return str
|
||||
case JSON:
|
||||
return typedValueToJSON(str, "string")
|
||||
}
|
||||
}
|
||||
panic("Switch didn't cover all node types")
|
||||
}
|
||||
|
||||
func (node *VectorLiteral) ToString() string {
|
||||
|
|
|
@ -3,7 +3,7 @@ package rules
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/matttproud/prometheus/rules/ast"
|
||||
"github.com/matttproud/prometheus/rules/ast"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
|
@ -12,28 +12,28 @@ import (
|
|||
// NOTE: This parser is non-reentrant due to its dependence on global state.
|
||||
|
||||
// GoLex sadly needs these global variables for storing temporary token/parsing information.
|
||||
var yylval *yySymType // For storing extra token information, like the contents of a string.
|
||||
var yyline int // Line number within the current file or buffer.
|
||||
var yypos int // Character position within the current line.
|
||||
var yylval *yySymType // For storing extra token information, like the contents of a string.
|
||||
var yyline int // Line number within the current file or buffer.
|
||||
var yypos int // Character position within the current line.
|
||||
|
||||
type RulesLexer struct {
|
||||
errors []string // Errors encountered during parsing.
|
||||
startToken int // Dummy token to simulate multiple start symbols (see below).
|
||||
parsedRules []*Rule // Parsed full rules.
|
||||
parsedExpr ast.Node // Parsed single expression.
|
||||
errors []string // Errors encountered during parsing.
|
||||
startToken int // Dummy token to simulate multiple start symbols (see below).
|
||||
parsedRules []*Rule // Parsed full rules.
|
||||
parsedExpr ast.Node // Parsed single expression.
|
||||
}
|
||||
|
||||
func (lexer *RulesLexer) Lex(lval *yySymType) int {
|
||||
yylval = lval
|
||||
|
||||
// We simulate multiple start symbols for closely-related grammars via dummy tokens. See
|
||||
// http://www.gnu.org/software/bison/manual/html_node/Multiple-start_002dsymbols.html
|
||||
// Reason: we want to be able to parse lists of named rules as well as single expressions.
|
||||
if lexer.startToken != 0 {
|
||||
startToken := lexer.startToken
|
||||
lexer.startToken = 0
|
||||
return startToken
|
||||
}
|
||||
// We simulate multiple start symbols for closely-related grammars via dummy tokens. See
|
||||
// http://www.gnu.org/software/bison/manual/html_node/Multiple-start_002dsymbols.html
|
||||
// Reason: we want to be able to parse lists of named rules as well as single expressions.
|
||||
if lexer.startToken != 0 {
|
||||
startToken := lexer.startToken
|
||||
lexer.startToken = 0
|
||||
return startToken
|
||||
}
|
||||
|
||||
tokenType := yylex()
|
||||
return tokenType
|
||||
|
@ -48,16 +48,16 @@ func LoadFromReader(rulesReader io.Reader, singleExpr bool) (interface{}, error)
|
|||
yyin = rulesReader
|
||||
yypos = 1
|
||||
yyline = 1
|
||||
yydata = ""
|
||||
yytext = ""
|
||||
yydata = ""
|
||||
yytext = ""
|
||||
|
||||
lexer := &RulesLexer{
|
||||
startToken: START_RULES,
|
||||
}
|
||||
startToken: START_RULES,
|
||||
}
|
||||
|
||||
if singleExpr {
|
||||
lexer.startToken = START_EXPRESSION
|
||||
}
|
||||
if singleExpr {
|
||||
lexer.startToken = START_EXPRESSION
|
||||
}
|
||||
|
||||
ret := yyParse(lexer)
|
||||
if ret != 0 && len(lexer.errors) == 0 {
|
||||
|
@ -69,20 +69,20 @@ func LoadFromReader(rulesReader io.Reader, singleExpr bool) (interface{}, error)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if singleExpr{
|
||||
return lexer.parsedExpr, nil
|
||||
} else {
|
||||
return lexer.parsedRules, nil
|
||||
}
|
||||
panic("")
|
||||
if singleExpr {
|
||||
return lexer.parsedExpr, nil
|
||||
} else {
|
||||
return lexer.parsedRules, nil
|
||||
}
|
||||
panic("")
|
||||
}
|
||||
|
||||
func LoadRulesFromReader(rulesReader io.Reader) ([]*Rule, error) {
|
||||
expr, err := LoadFromReader(rulesReader, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return expr.([]*Rule), err
|
||||
expr, err := LoadFromReader(rulesReader, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return expr.([]*Rule), err
|
||||
}
|
||||
|
||||
func LoadRulesFromString(rulesString string) ([]*Rule, error) {
|
||||
|
@ -92,7 +92,7 @@ func LoadRulesFromString(rulesString string) ([]*Rule, error) {
|
|||
|
||||
func LoadRulesFromFile(fileName string) ([]*Rule, error) {
|
||||
rulesReader, err := os.Open(fileName)
|
||||
defer rulesReader.Close()
|
||||
defer rulesReader.Close()
|
||||
if err != nil {
|
||||
return []*Rule{}, err
|
||||
}
|
||||
|
@ -100,11 +100,11 @@ func LoadRulesFromFile(fileName string) ([]*Rule, error) {
|
|||
}
|
||||
|
||||
func LoadExprFromReader(exprReader io.Reader) (ast.Node, error) {
|
||||
expr, err := LoadFromReader(exprReader, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return expr.(ast.Node), err
|
||||
expr, err := LoadFromReader(exprReader, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return expr.(ast.Node), err
|
||||
}
|
||||
|
||||
func LoadExprFromString(exprString string) (ast.Node, error) {
|
||||
|
@ -114,7 +114,7 @@ func LoadExprFromString(exprString string) (ast.Node, error) {
|
|||
|
||||
func LoadExprFromFile(fileName string) (ast.Node, error) {
|
||||
exprReader, err := os.Open(fileName)
|
||||
defer exprReader.Close()
|
||||
defer exprReader.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -107,14 +107,14 @@ var expressionTests = []struct {
|
|||
}, {
|
||||
expr: "rate(http_requests['25m'])",
|
||||
output: []string{
|
||||
"http_requests{group='canary',instance='0',job='api-server'} => 150 @[%v]",
|
||||
"http_requests{group='canary',instance='0',job='app-server'} => 350 @[%v]",
|
||||
"http_requests{group='canary',instance='1',job='api-server'} => 200 @[%v]",
|
||||
"http_requests{group='canary',instance='1',job='app-server'} => 400 @[%v]",
|
||||
"http_requests{group='production',instance='0',job='api-server'} => 50 @[%v]",
|
||||
"http_requests{group='production',instance='0',job='app-server'} => 250 @[%v]",
|
||||
"http_requests{group='production',instance='1',job='api-server'} => 100 @[%v]",
|
||||
"http_requests{group='production',instance='1',job='app-server'} => 300 @[%v]",
|
||||
"http_requests{group='canary',instance='0',job='api-server'} => 150 @[%v]",
|
||||
"http_requests{group='canary',instance='0',job='app-server'} => 350 @[%v]",
|
||||
"http_requests{group='canary',instance='1',job='api-server'} => 200 @[%v]",
|
||||
"http_requests{group='canary',instance='1',job='app-server'} => 400 @[%v]",
|
||||
"http_requests{group='production',instance='0',job='api-server'} => 50 @[%v]",
|
||||
"http_requests{group='production',instance='0',job='app-server'} => 250 @[%v]",
|
||||
"http_requests{group='production',instance='1',job='api-server'} => 100 @[%v]",
|
||||
"http_requests{group='production',instance='1',job='app-server'} => 300 @[%v]",
|
||||
},
|
||||
// Invalid expressions that should fail to parse.
|
||||
}, {
|
||||
|
|
Loading…
Reference in a new issue