mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-26 13:11:11 -08:00
Remove backoff on scrape failure.
Having metrics with variable timestamps inconsistently spaced when things fail will make it harder to write correct rules. Update status page, requires some refactoring to insert a function. Change-Id: Ie1c586cca53b8f3b318af8c21c418873063738a8
This commit is contained in:
parent
7ad2cfdfbc
commit
3835b7507d
|
@ -14,20 +14,9 @@
|
|||
package retrieval
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/extraction"
|
||||
)
|
||||
|
||||
type literalScheduler time.Time
|
||||
|
||||
func (s literalScheduler) ScheduledFor() time.Time {
|
||||
return time.Time(s)
|
||||
}
|
||||
|
||||
func (s literalScheduler) Reschedule(earliest time.Time, future TargetState) {
|
||||
}
|
||||
|
||||
type nopIngester struct{}
|
||||
|
||||
func (i nopIngester) Ingest(*extraction.Result) error {
|
||||
|
|
|
@ -19,9 +19,7 @@ import (
|
|||
|
||||
func TestInterfaces(t *testing.T) {
|
||||
var (
|
||||
_ Target = &target{}
|
||||
_ TargetManager = &targetManager{}
|
||||
_ healthReporter = &target{}
|
||||
_ scheduler = &healthScheduler{}
|
||||
_ Target = &target{}
|
||||
_ TargetManager = &targetManager{}
|
||||
)
|
||||
}
|
||||
|
|
|
@ -1,118 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package retrieval
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// The default increment for exponential backoff when querying a target.
|
||||
DEFAULT_BACKOFF_VALUE = 2
|
||||
// The base units for the exponential backoff.
|
||||
DEFAULT_BACKOFF_VALUE_UNIT = time.Second
|
||||
// The maximum allowed backoff time.
|
||||
MAXIMUM_BACKOFF_VALUE = 2 * time.Minute
|
||||
)
|
||||
|
||||
// scheduler is an interface that various scheduling strategies must fulfill
|
||||
// in order to set the scheduling order for a target.
|
||||
//
|
||||
// Target takes advantage of this type by embedding an instance of scheduler
|
||||
// in each Target instance itself. The emitted scheduler.ScheduledFor() is
|
||||
// the basis for sorting the order of pending queries.
|
||||
//
|
||||
// This type is described as an interface to maximize testability.
|
||||
type scheduler interface {
|
||||
// ScheduledFor emits the earliest time at which the given object is allowed
|
||||
// to be run. This time may or not be a reflection of the earliest parameter
|
||||
// provided in Reschedule; that is up to the underlying strategy
|
||||
// implementations.
|
||||
ScheduledFor() time.Time
|
||||
// Instruct the scheduled item to re-schedule itself given new state data and
|
||||
// the earliest time at which the outside system thinks the operation should
|
||||
// be scheduled for.
|
||||
Reschedule(earliest time.Time, future TargetState)
|
||||
}
|
||||
|
||||
// healthScheduler is an implementation of scheduler that uses health data
|
||||
// provided by the target field as well as unreachability counts to determine
|
||||
// when to next schedule an operation.
|
||||
//
|
||||
// The type is almost capable of being used with default initialization, except
|
||||
// that a target field must be provided for which the system compares current
|
||||
// health against future proposed values.
|
||||
type healthScheduler struct {
|
||||
scheduledFor time.Time
|
||||
target healthReporter
|
||||
time utility.Time
|
||||
unreachableCount int
|
||||
}
|
||||
|
||||
func (s healthScheduler) ScheduledFor() time.Time {
|
||||
return s.scheduledFor
|
||||
}
|
||||
|
||||
// Reschedule, like the protocol described in scheduler, uses the current and
|
||||
// proposed future health state to determine how and when a given subject is to
|
||||
// be scheduled.
|
||||
//
|
||||
// If a subject has been at given moment marked as unhealthy, an exponential
|
||||
// backoff scheme is applied to it. The reason for this backoff is to ensure
|
||||
// that known-healthy targets can consume valuable request queuing resources
|
||||
// first. Depending on the retrieval interval and number of consecutive
|
||||
// unhealthy markings, the query of these unhealthy individuals may come before
|
||||
// the healthy ones for a short time to help ensure expeditious retrieval.
|
||||
// The inflection point that drops these to the back of the queue is beneficial
|
||||
// to save resources in the long-run.
|
||||
//
|
||||
// If a subject is healthy, its next scheduling opportunity is set to
|
||||
// earliest, for this ensures fair querying of all remaining healthy targets and
|
||||
// removes bias in the ordering. In order for the anti-bias value to have any
|
||||
// value, the earliest opportunity should be set to a value that is constant
|
||||
// for a given batch of subjects who are to be scraped on a given interval.
|
||||
func (s *healthScheduler) Reschedule(e time.Time, f TargetState) {
|
||||
currentState := s.target.State()
|
||||
// XXX: Handle metrics surrounding health.
|
||||
switch currentState {
|
||||
case UNKNOWN, UNREACHABLE:
|
||||
switch f {
|
||||
case ALIVE:
|
||||
s.unreachableCount = 0
|
||||
break
|
||||
case UNREACHABLE:
|
||||
s.unreachableCount++
|
||||
break
|
||||
}
|
||||
case ALIVE:
|
||||
switch f {
|
||||
case UNREACHABLE:
|
||||
s.unreachableCount++
|
||||
}
|
||||
}
|
||||
|
||||
if s.unreachableCount == 0 {
|
||||
s.scheduledFor = e
|
||||
} else {
|
||||
backoff := MAXIMUM_BACKOFF_VALUE
|
||||
exponential := time.Duration(math.Pow(DEFAULT_BACKOFF_VALUE, float64(s.unreachableCount))) * DEFAULT_BACKOFF_VALUE_UNIT
|
||||
if backoff > exponential {
|
||||
backoff = exponential
|
||||
}
|
||||
|
||||
s.scheduledFor = s.time.Now().Add(backoff)
|
||||
}
|
||||
}
|
|
@ -1,141 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package retrieval
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
"github.com/prometheus/prometheus/utility/test"
|
||||
)
|
||||
|
||||
type fakeHealthReporter struct {
|
||||
index int
|
||||
stateQueue []TargetState
|
||||
}
|
||||
|
||||
func (h fakeHealthReporter) State() (state TargetState) {
|
||||
state = h.stateQueue[h.index]
|
||||
|
||||
h.index++
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func testHealthScheduler(t testing.TB) {
|
||||
now := time.Now()
|
||||
var scenarios = []struct {
|
||||
futureHealthState []TargetState
|
||||
preloadedTimes []time.Time
|
||||
expectedSchedule []time.Time
|
||||
}{
|
||||
// The behavior discussed in healthScheduler.Reschedule should be read
|
||||
// fully to understand the whys and wherefores.
|
||||
{
|
||||
futureHealthState: []TargetState{UNKNOWN, ALIVE, ALIVE},
|
||||
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
|
||||
expectedSchedule: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
|
||||
},
|
||||
{
|
||||
futureHealthState: []TargetState{UNKNOWN, UNREACHABLE, UNREACHABLE},
|
||||
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
|
||||
expectedSchedule: []time.Time{now, now.Add(time.Second * 2), now.Add(time.Minute).Add(time.Second * 4)},
|
||||
},
|
||||
{
|
||||
futureHealthState: []TargetState{UNKNOWN, UNREACHABLE, ALIVE},
|
||||
preloadedTimes: []time.Time{now, now.Add(time.Minute), now.Add(time.Minute * 2)},
|
||||
expectedSchedule: []time.Time{now, now.Add(time.Second * 2), now.Add(time.Minute * 2)},
|
||||
},
|
||||
{
|
||||
futureHealthState: []TargetState{
|
||||
UNKNOWN,
|
||||
UNREACHABLE,
|
||||
UNREACHABLE,
|
||||
UNREACHABLE,
|
||||
UNREACHABLE,
|
||||
UNREACHABLE,
|
||||
UNREACHABLE,
|
||||
UNREACHABLE,
|
||||
UNREACHABLE,
|
||||
UNREACHABLE,
|
||||
},
|
||||
preloadedTimes: []time.Time{
|
||||
now,
|
||||
now.Add(time.Minute),
|
||||
now.Add(time.Minute * 2),
|
||||
now.Add(time.Minute * 3),
|
||||
now.Add(time.Minute * 4),
|
||||
now.Add(time.Minute * 5),
|
||||
now.Add(time.Minute * 6),
|
||||
now.Add(time.Minute * 7),
|
||||
now.Add(time.Minute * 8),
|
||||
now.Add(time.Minute * 9),
|
||||
},
|
||||
expectedSchedule: []time.Time{
|
||||
now,
|
||||
now.Add(time.Second * 2),
|
||||
now.Add(time.Minute * 1).Add(time.Second * 4),
|
||||
now.Add(time.Minute * 2).Add(time.Second * 8),
|
||||
now.Add(time.Minute * 3).Add(time.Second * 16),
|
||||
now.Add(time.Minute * 4).Add(time.Second * 32),
|
||||
now.Add(time.Minute * 5).Add(time.Second * 64),
|
||||
now.Add(time.Minute * 6).Add(time.Minute * 2),
|
||||
now.Add(time.Minute * 7).Add(time.Minute * 2),
|
||||
now.Add(time.Minute * 8).Add(time.Minute * 2),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, scenario := range scenarios {
|
||||
provider := test.NewInstantProvider(scenario.preloadedTimes)
|
||||
|
||||
reporter := fakeHealthReporter{}
|
||||
for _, state := range scenario.futureHealthState {
|
||||
reporter.stateQueue = append(reporter.stateQueue, state)
|
||||
}
|
||||
if len(scenario.preloadedTimes) != len(scenario.futureHealthState) || len(scenario.futureHealthState) != len(scenario.expectedSchedule) {
|
||||
t.Fatalf("%d. times and health reports and next time lengths were not equal.", i)
|
||||
}
|
||||
|
||||
time := utility.Time{
|
||||
Provider: provider,
|
||||
}
|
||||
|
||||
scheduler := healthScheduler{
|
||||
time: time,
|
||||
target: reporter,
|
||||
scheduledFor: now,
|
||||
}
|
||||
|
||||
for j := 0; j < len(scenario.preloadedTimes); j++ {
|
||||
futureState := scenario.futureHealthState[j]
|
||||
scheduler.Reschedule(scenario.preloadedTimes[j], futureState)
|
||||
nextSchedule := scheduler.ScheduledFor()
|
||||
if nextSchedule != scenario.expectedSchedule[j] {
|
||||
t.Errorf("%d.%d. Expected to be scheduled to %s, got %s", i, j, scenario.expectedSchedule[j], nextSchedule)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthScheduler(t *testing.T) {
|
||||
testHealthScheduler(t)
|
||||
}
|
||||
|
||||
func BenchmarkHealthScheduler(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
testHealthScheduler(b)
|
||||
}
|
||||
}
|
|
@ -87,15 +87,6 @@ const (
|
|||
UNREACHABLE
|
||||
)
|
||||
|
||||
// A healthReporter is a type that can provide insight into its health state.
|
||||
//
|
||||
// It mainly exists for testability reasons to decouple the scheduler behaviors
|
||||
// from fully-fledged Target and other types.
|
||||
type healthReporter interface {
|
||||
// Report the last-known health state for this target.
|
||||
State() TargetState
|
||||
}
|
||||
|
||||
// A Target represents an endpoint that should be interrogated for metrics.
|
||||
//
|
||||
// The protocol described by this type will likely change in future iterations,
|
||||
|
@ -108,26 +99,13 @@ type healthReporter interface {
|
|||
// refers.
|
||||
type Target interface {
|
||||
// Retrieve values from this target.
|
||||
//
|
||||
// earliest refers to the soonest available opportunity to reschedule the
|
||||
// target for a future retrieval. It is up to the underlying scheduler type,
|
||||
// alluded to in the scheduledFor function, to use this as it wants to. The
|
||||
// current use case is to create a common batching time for scraping multiple
|
||||
// Targets in the future through the TargetPool.
|
||||
Scrape(earliest time.Time, ingester extraction.Ingester) error
|
||||
// Fulfill the healthReporter interface.
|
||||
State() TargetState
|
||||
// Report the soonest time at which this Target may be scheduled for
|
||||
// retrieval. This value needn't convey that the operation occurs at this
|
||||
// time, but it should occur no sooner than it.
|
||||
//
|
||||
// Right now, this is used as the sorting key in TargetPool.
|
||||
ScheduledFor() time.Time
|
||||
// EstimatedTimeToExecute emits the amount of time until the next prospective
|
||||
// scheduling opportunity for this target.
|
||||
EstimatedTimeToExecute() time.Duration
|
||||
Scrape(ingester extraction.Ingester) error
|
||||
// Return the last encountered scrape error, if any.
|
||||
LastError() error
|
||||
// Return the health of the target.
|
||||
State() TargetState
|
||||
// Return the last time a scrape was attempted.
|
||||
LastScrape() time.Time
|
||||
// The address to which the Target corresponds. Out of all of the available
|
||||
// points in this interface, this one is the best candidate to change given
|
||||
// the ways to express the endpoint.
|
||||
|
@ -145,13 +123,12 @@ type Target interface {
|
|||
|
||||
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
|
||||
type target struct {
|
||||
// scheduler provides the scheduling strategy that is used to formulate what
|
||||
// is returned in Target.scheduledFor.
|
||||
scheduler scheduler
|
||||
// The current health state of the target.
|
||||
state TargetState
|
||||
// The last encountered scrape error, if any.
|
||||
lastError error
|
||||
// The last time a scrape was attempted.
|
||||
lastScrape time.Time
|
||||
|
||||
address string
|
||||
// What is the deadline for the HTTP or HTTPS against this endpoint.
|
||||
|
@ -171,11 +148,6 @@ func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.La
|
|||
httpClient: utility.NewDeadlineClient(deadline),
|
||||
}
|
||||
|
||||
scheduler := &healthScheduler{
|
||||
target: target,
|
||||
}
|
||||
target.scheduler = scheduler
|
||||
|
||||
return target
|
||||
}
|
||||
|
||||
|
@ -204,22 +176,18 @@ func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp clie
|
|||
})
|
||||
}
|
||||
|
||||
func (t *target) Scrape(earliest time.Time, ingester extraction.Ingester) error {
|
||||
func (t *target) Scrape(ingester extraction.Ingester) error {
|
||||
now := clientmodel.Now()
|
||||
futureState := t.state
|
||||
err := t.scrape(now, ingester)
|
||||
if err != nil {
|
||||
t.recordScrapeHealth(ingester, now, false)
|
||||
futureState = UNREACHABLE
|
||||
} else {
|
||||
if err == nil {
|
||||
t.state = ALIVE
|
||||
t.recordScrapeHealth(ingester, now, true)
|
||||
futureState = ALIVE
|
||||
} else {
|
||||
t.state = UNREACHABLE
|
||||
t.recordScrapeHealth(ingester, now, false)
|
||||
}
|
||||
|
||||
t.scheduler.Reschedule(earliest, futureState)
|
||||
t.state = futureState
|
||||
t.lastScrape = time.Now()
|
||||
t.lastError = err
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -275,20 +243,16 @@ func (t *target) scrape(timestamp clientmodel.Timestamp, ingester extraction.Ing
|
|||
return processor.ProcessSingle(resp.Body, i, processOptions)
|
||||
}
|
||||
|
||||
func (t *target) LastError() error {
|
||||
return t.lastError
|
||||
}
|
||||
|
||||
func (t *target) State() TargetState {
|
||||
return t.state
|
||||
}
|
||||
|
||||
func (t *target) ScheduledFor() time.Time {
|
||||
return t.scheduler.ScheduledFor()
|
||||
}
|
||||
|
||||
func (t *target) EstimatedTimeToExecute() time.Duration {
|
||||
return time.Now().Sub(t.scheduler.ScheduledFor())
|
||||
}
|
||||
|
||||
func (t *target) LastError() error {
|
||||
return t.lastError
|
||||
func (t *target) LastScrape() time.Time {
|
||||
return t.lastScrape
|
||||
}
|
||||
|
||||
func (t *target) Address() string {
|
||||
|
@ -327,11 +291,3 @@ type targets []Target
|
|||
func (t targets) Len() int {
|
||||
return len(t)
|
||||
}
|
||||
|
||||
func (t targets) Less(i, j int) bool {
|
||||
return t[i].ScheduledFor().Before(t[j].ScheduledFor())
|
||||
}
|
||||
|
||||
func (t targets) Swap(i, j int) {
|
||||
t[i], t[j] = t[j], t[i]
|
||||
}
|
||||
|
|
|
@ -38,12 +38,11 @@ func (i *collectResultIngester) Ingest(r *extraction.Result) error {
|
|||
|
||||
func TestTargetScrapeUpdatesState(t *testing.T) {
|
||||
testTarget := target{
|
||||
scheduler: literalScheduler{},
|
||||
state: UNKNOWN,
|
||||
address: "bad schema",
|
||||
httpClient: utility.NewDeadlineClient(0),
|
||||
}
|
||||
testTarget.Scrape(time.Time{}, nopIngester{})
|
||||
testTarget.Scrape(nopIngester{})
|
||||
if testTarget.state != UNREACHABLE {
|
||||
t.Errorf("Expected target state %v, actual: %v", UNREACHABLE, testTarget.state)
|
||||
}
|
||||
|
@ -51,7 +50,6 @@ func TestTargetScrapeUpdatesState(t *testing.T) {
|
|||
|
||||
func TestTargetRecordScrapeHealth(t *testing.T) {
|
||||
testTarget := target{
|
||||
scheduler: literalScheduler{},
|
||||
address: "http://example.url",
|
||||
baseLabels: clientmodel.LabelSet{clientmodel.JobLabel: "testjob"},
|
||||
httpClient: utility.NewDeadlineClient(0),
|
||||
|
@ -102,7 +100,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
|
|||
|
||||
// scrape once without timeout
|
||||
signal <- true
|
||||
if err := testTarget.Scrape(time.Now(), ingester); err != nil {
|
||||
if err := testTarget.Scrape(ingester); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -111,12 +109,12 @@ func TestTargetScrapeTimeout(t *testing.T) {
|
|||
|
||||
// now scrape again
|
||||
signal <- true
|
||||
if err := testTarget.Scrape(time.Now(), ingester); err != nil {
|
||||
if err := testTarget.Scrape(ingester); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// now timeout
|
||||
if err := testTarget.Scrape(time.Now(), ingester); err == nil {
|
||||
if err := testTarget.Scrape(ingester); err == nil {
|
||||
t.Fatal("expected scrape to timeout")
|
||||
} else {
|
||||
signal <- true // let handler continue
|
||||
|
@ -124,7 +122,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
|
|||
|
||||
// now scrape again without timeout
|
||||
signal <- true
|
||||
if err := testTarget.Scrape(time.Now(), ingester); err != nil {
|
||||
if err := testTarget.Scrape(ingester); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -140,7 +138,7 @@ func TestTargetScrape404(t *testing.T) {
|
|||
ingester := nopIngester{}
|
||||
|
||||
want := errors.New("server returned HTTP status 404 Not Found")
|
||||
got := testTarget.Scrape(time.Now(), ingester)
|
||||
got := testTarget.Scrape(ingester)
|
||||
if got == nil || want.Error() != got.Error() {
|
||||
t.Fatalf("want err %q, got %q", want, got)
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func (t fakeTarget) Interval() time.Duration {
|
|||
return t.interval
|
||||
}
|
||||
|
||||
func (t *fakeTarget) Scrape(e time.Time, i extraction.Ingester) error {
|
||||
func (t *fakeTarget) Scrape(i extraction.Ingester) error {
|
||||
t.scrapeCount++
|
||||
|
||||
return nil
|
||||
|
@ -65,6 +65,10 @@ func (t fakeTarget) State() TargetState {
|
|||
return ALIVE
|
||||
}
|
||||
|
||||
func (t fakeTarget) LastScrape() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
func (t *fakeTarget) ScheduledFor() (time time.Time) {
|
||||
time = t.schedules[t.scheduleIndex]
|
||||
t.scheduleIndex++
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
package retrieval
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -133,11 +132,11 @@ func (p *TargetPool) replaceTargets(newTargets []Target) {
|
|||
p.targets = newTargets
|
||||
}
|
||||
|
||||
func (p *TargetPool) runSingle(earliest time.Time, ingester extraction.Ingester, t Target) {
|
||||
func (p *TargetPool) runSingle(ingester extraction.Ingester, t Target) {
|
||||
p.manager.acquire()
|
||||
defer p.manager.release()
|
||||
|
||||
t.Scrape(earliest, ingester)
|
||||
t.Scrape(ingester)
|
||||
}
|
||||
|
||||
func (p *TargetPool) runIteration(ingester extraction.Ingester, interval time.Duration) {
|
||||
|
@ -156,23 +155,11 @@ func (p *TargetPool) runIteration(ingester extraction.Ingester, interval time.Du
|
|||
begin := time.Now()
|
||||
wait := sync.WaitGroup{}
|
||||
|
||||
// Sort p.targets by next scheduling time so we can process the earliest
|
||||
// targets first.
|
||||
sort.Sort(p.targets)
|
||||
|
||||
for _, target := range p.targets {
|
||||
now := time.Now()
|
||||
|
||||
if target.ScheduledFor().After(now) {
|
||||
// None of the remaining targets are ready to be scheduled. Signal that
|
||||
// we're done processing them in this scrape iteration.
|
||||
continue
|
||||
}
|
||||
|
||||
wait.Add(1)
|
||||
|
||||
go func(t Target) {
|
||||
p.runSingle(now, ingester, t)
|
||||
p.runSingle(ingester, t)
|
||||
wait.Done()
|
||||
}(target)
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
package retrieval
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
@ -57,44 +56,21 @@ func testTargetPool(t testing.TB) {
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "plural descending schedules",
|
||||
name: "plural schedules",
|
||||
inputs: []input{
|
||||
{
|
||||
address: "http://plural-descending.com",
|
||||
scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC),
|
||||
address: "http://plural.net",
|
||||
},
|
||||
{
|
||||
address: "http://plural-descending.net",
|
||||
scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC),
|
||||
address: "http://plural.com",
|
||||
},
|
||||
},
|
||||
outputs: []output{
|
||||
{
|
||||
address: "http://plural-descending.net",
|
||||
address: "http://plural.net",
|
||||
},
|
||||
{
|
||||
address: "http://plural-descending.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "plural ascending schedules",
|
||||
inputs: []input{
|
||||
{
|
||||
address: "http://plural-ascending.net",
|
||||
scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC),
|
||||
},
|
||||
{
|
||||
address: "http://plural-ascending.com",
|
||||
scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
outputs: []output{
|
||||
{
|
||||
address: "http://plural-ascending.net",
|
||||
},
|
||||
{
|
||||
address: "http://plural-ascending.com",
|
||||
address: "http://plural.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -105,13 +81,11 @@ func testTargetPool(t testing.TB) {
|
|||
|
||||
for _, input := range scenario.inputs {
|
||||
target := target{
|
||||
address: input.address,
|
||||
scheduler: literalScheduler(input.scheduledFor),
|
||||
address: input.address,
|
||||
}
|
||||
|
||||
pool.addTarget(&target)
|
||||
}
|
||||
sort.Sort(pool.targets)
|
||||
|
||||
if pool.targets.Len() != len(scenario.outputs) {
|
||||
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.targets.Len())
|
||||
|
@ -136,68 +110,41 @@ func TestTargetPool(t *testing.T) {
|
|||
testTargetPool(t)
|
||||
}
|
||||
|
||||
func TestTargetPoolIterationWithUnhealthyTargetsFinishes(t *testing.T) {
|
||||
pool := TargetPool{}
|
||||
target := &target{
|
||||
address: "http://example.com/metrics.json",
|
||||
scheduler: literalScheduler(time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)),
|
||||
}
|
||||
pool.addTarget(target)
|
||||
|
||||
done := make(chan bool)
|
||||
go func() {
|
||||
pool.runIteration(nopIngester{}, time.Duration(0))
|
||||
done <- true
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
break
|
||||
case <-time.After(time.Duration(1) * time.Second):
|
||||
t.Fatalf("Targetpool iteration is stuck")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTargetPoolReplaceTargets(t *testing.T) {
|
||||
pool := TargetPool{}
|
||||
oldTarget1 := &target{
|
||||
address: "http://example1.com/metrics.json",
|
||||
scheduler: literalScheduler(time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)),
|
||||
state: UNREACHABLE,
|
||||
address: "http://example1.com/metrics.json",
|
||||
state: UNREACHABLE,
|
||||
}
|
||||
oldTarget2 := &target{
|
||||
address: "http://example2.com/metrics.json",
|
||||
scheduler: literalScheduler(time.Date(7500, 1, 1, 0, 0, 0, 0, time.UTC)),
|
||||
state: UNREACHABLE,
|
||||
address: "http://example2.com/metrics.json",
|
||||
state: UNREACHABLE,
|
||||
}
|
||||
newTarget1 := &target{
|
||||
address: "http://example1.com/metrics.json",
|
||||
scheduler: literalScheduler(time.Date(5000, 1, 1, 0, 0, 0, 0, time.UTC)),
|
||||
state: ALIVE,
|
||||
address: "http://example1.com/metrics.json",
|
||||
state: ALIVE,
|
||||
}
|
||||
newTarget2 := &target{
|
||||
address: "http://example3.com/metrics.json",
|
||||
scheduler: literalScheduler(time.Date(2500, 1, 1, 0, 0, 0, 0, time.UTC)),
|
||||
state: ALIVE,
|
||||
address: "http://example3.com/metrics.json",
|
||||
state: ALIVE,
|
||||
}
|
||||
|
||||
pool.addTarget(oldTarget1)
|
||||
pool.addTarget(oldTarget2)
|
||||
|
||||
pool.replaceTargets([]Target{newTarget1, newTarget2})
|
||||
sort.Sort(pool.targets)
|
||||
|
||||
if pool.targets.Len() != 2 {
|
||||
t.Errorf("Expected 2 elements in pool, had %d", pool.targets.Len())
|
||||
}
|
||||
|
||||
target1 := pool.targets[0].(*target)
|
||||
if target1.state != newTarget1.state {
|
||||
t.Errorf("Wrong first target returned from pool, expected %v, got %v", newTarget2, target1)
|
||||
if target1.state != oldTarget1.state {
|
||||
t.Errorf("Wrong first target returned from pool, expected %v, got %v", oldTarget1, target1)
|
||||
}
|
||||
target2 := pool.targets[1].(*target)
|
||||
if target2.state != oldTarget1.state {
|
||||
t.Errorf("Wrong second target returned from pool, expected %v, got %v", oldTarget1, target2)
|
||||
if target2.state != newTarget2.state {
|
||||
t.Errorf("Wrong second target returned from pool, expected %v, got %v", newTarget2, target2)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -260,14 +260,14 @@ func (te templateExpander) ExpandHTML(templateFiles []string) (result string, re
|
|||
|
||||
var buffer bytes.Buffer
|
||||
tmpl := html_template.New(te.name).Funcs(html_template.FuncMap(te.funcMap))
|
||||
tmpl.Funcs(html_template.FuncMap{
|
||||
"tmpl": func(name string, data interface{}) (html_template.HTML, error) {
|
||||
var buffer bytes.Buffer
|
||||
err := tmpl.ExecuteTemplate(&buffer, name, data)
|
||||
return html_template.HTML(buffer.String()), err
|
||||
},
|
||||
})
|
||||
tmpl, err := tmpl.Parse(te.text)
|
||||
tmpl.Funcs(html_template.FuncMap{
|
||||
"tmpl": func(name string, data interface{}) (html_template.HTML, error) {
|
||||
var buffer bytes.Buffer
|
||||
err := tmpl.ExecuteTemplate(&buffer, name, data)
|
||||
return html_template.HTML(buffer.String()), err
|
||||
},
|
||||
})
|
||||
tmpl, err := tmpl.Parse(te.text)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error parsing template %v: %v", te.name, err)
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@
|
|||
<th>Endpoint</th>
|
||||
<th>State</th>
|
||||
<th>Base Labels</th>
|
||||
<th>Next Retrieval</th>
|
||||
<th>Last Scrape</th>
|
||||
<th>Error</th>
|
||||
</tr>
|
||||
</thead>
|
||||
|
@ -56,7 +56,7 @@
|
|||
{{.BaseLabels}}
|
||||
</td>
|
||||
<td>
|
||||
{{.EstimatedTimeToExecute}}
|
||||
{{if .LastScrape.IsZero}}Never{{else}}{{since .LastScrape}} ago{{end}}
|
||||
</td>
|
||||
<td>
|
||||
{{if .LastError}}
|
||||
|
|
55
web/web.go
55
web/web.go
|
@ -17,6 +17,7 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
|
@ -113,44 +114,42 @@ func (s WebService) quitHandler(w http.ResponseWriter, r *http.Request) {
|
|||
s.QuitDelegate()
|
||||
}
|
||||
|
||||
func getLocalTemplate(name string) (*template.Template, error) {
|
||||
return template.ParseFiles(
|
||||
"web/templates/_base.html",
|
||||
fmt.Sprintf("web/templates/%s.html", name),
|
||||
)
|
||||
func getTemplateFile(name string) (string, error) {
|
||||
if *useLocalAssets {
|
||||
file, err := ioutil.ReadFile(fmt.Sprintf("web/templates/%s.html", name))
|
||||
if err != nil {
|
||||
glog.Errorf("Could not read %s template: %s", name, err)
|
||||
return "", err
|
||||
}
|
||||
return string(file), nil
|
||||
} else {
|
||||
file, err := blob.GetFile(blob.TemplateFiles, name+".html")
|
||||
if err != nil {
|
||||
glog.Errorf("Could not read %s template: %s", name, err)
|
||||
return "", err
|
||||
}
|
||||
return string(file), nil
|
||||
}
|
||||
}
|
||||
|
||||
func getEmbeddedTemplate(name string) (*template.Template, error) {
|
||||
t := template.New("_base")
|
||||
|
||||
file, err := blob.GetFile(blob.TemplateFiles, "_base.html")
|
||||
func getTemplate(name string) (t *template.Template, err error) {
|
||||
t = template.New("_base")
|
||||
t.Funcs(template.FuncMap{
|
||||
"since": time.Since,
|
||||
})
|
||||
file, err := getTemplateFile("_base")
|
||||
if err != nil {
|
||||
glog.Error("Could not read base template: ", err)
|
||||
return nil, err
|
||||
}
|
||||
t.Parse(string(file))
|
||||
t.Parse(file)
|
||||
|
||||
file, err = blob.GetFile(blob.TemplateFiles, name+".html")
|
||||
file, err = getTemplateFile(name)
|
||||
if err != nil {
|
||||
glog.Errorf("Could not read %s template: %s", name, err)
|
||||
glog.Error("Could not read base template: ", err)
|
||||
return nil, err
|
||||
}
|
||||
t.Parse(string(file))
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func getTemplate(name string) (t *template.Template, err error) {
|
||||
if *useLocalAssets {
|
||||
t, err = getLocalTemplate(name)
|
||||
} else {
|
||||
t, err = getEmbeddedTemplate(name)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
t.Parse(file)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue