Merge "Stagger scrapes to spread out load."

This commit is contained in:
Julius Volz 2014-08-20 18:13:19 +02:00 committed by Gerrit Code Review
commit e995cda75c
7 changed files with 206 additions and 201 deletions

View file

@ -67,8 +67,6 @@ var (
notificationQueueCapacity = flag.Int("alertmanager.notificationQueueCapacity", 100, "The size of the queue for pending alert manager notifications.")
concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.")
printVersion = flag.Bool("version", false, "print version information")
shutdownTimeout = flag.Duration("shutdownGracePeriod", 0*time.Second, "The amount of time Prometheus gives background services to finish running when shutdown is requested.")
@ -269,7 +267,7 @@ func main() {
deletionTimer := time.NewTicker(*deleteInterval)
// Queue depth will need to be exposed
targetManager := retrieval.NewTargetManager(ingester, *concurrentRetrievalAllowance)
targetManager := retrieval.NewTargetManager(ingester)
targetManager.AddTargetsFromConfig(conf)
notifications := make(chan notification.NotificationReqs, *notificationQueueCapacity)

View file

@ -15,6 +15,7 @@ package retrieval
import (
"fmt"
"math/rand"
"net/http"
"os"
"strings"
@ -41,6 +42,7 @@ const (
failure = "failure"
outcome = "outcome"
success = "success"
interval = "interval"
)
var (
@ -55,10 +57,20 @@ var (
},
[]string{job, instance, outcome},
)
targetIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "target_interval_length_seconds",
Help: "Actual intervals between scrapes.",
Objectives: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
},
[]string{interval},
)
)
func init() {
prometheus.MustRegister(targetOperationLatencies)
prometheus.MustRegister(targetIntervalLength)
}
// The state of the given Target.
@ -99,8 +111,6 @@ const (
// metrics are retrieved and deserialized from the given instance to which it
// refers.
type Target interface {
// Retrieve values from this target.
Scrape(ingester extraction.Ingester) error
// Return the last encountered scrape error, if any.
LastError() error
// Return the health of the target.
@ -120,6 +130,12 @@ type Target interface {
// labels) into an old target definition for the same endpoint. Preserve
// remaining information - like health state - from the old target.
Merge(newTarget Target)
// Scrape target at the specified interval.
RunScraper(extraction.Ingester, time.Duration)
// Stop scraping, synchronous.
StopScraper()
// Do a single scrape.
scrape(ingester extraction.Ingester) error
}
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
@ -130,6 +146,9 @@ type target struct {
lastError error
// The last time a scrape was attempted.
lastScrape time.Time
// Channel to signal RunScraper should stop, holds a channel
// to notify once stopped.
stopScraper chan bool
address string
// What is the deadline for the HTTP or HTTPS against this endpoint.
@ -143,10 +162,11 @@ type target struct {
// Furnish a reasonably configured target for querying.
func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target {
target := &target{
address: address,
Deadline: deadline,
baseLabels: baseLabels,
httpClient: utility.NewDeadlineClient(deadline),
address: address,
Deadline: deadline,
baseLabels: baseLabels,
httpClient: utility.NewDeadlineClient(deadline),
stopScraper: make(chan bool),
}
return target
@ -177,24 +197,40 @@ func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp clie
})
}
func (t *target) Scrape(ingester extraction.Ingester) error {
now := clientmodel.Now()
err := t.scrape(now, ingester)
if err == nil {
t.state = ALIVE
t.recordScrapeHealth(ingester, now, true)
} else {
t.state = UNREACHABLE
t.recordScrapeHealth(ingester, now, false)
func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration) {
jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64()))
select {
case <-jitterTimer.C:
case <-t.stopScraper:
return
}
jitterTimer.Stop()
ticker := time.NewTicker(interval)
defer ticker.Stop()
t.lastScrape = time.Now()
t.lastError = err
return err
t.scrape(ingester)
for {
select {
case <-ticker.C:
targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second))
t.lastScrape = time.Now()
t.scrape(ingester)
case <-t.stopScraper:
return
}
}
}
func (t *target) StopScraper() {
t.stopScraper <- true
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema=prometheus/telemetry;version=0.0.2;q=0.2,*/*;q=0.1`
func (t *target) scrape(timestamp clientmodel.Timestamp, ingester extraction.Ingester) (err error) {
func (t *target) scrape(ingester extraction.Ingester) (err error) {
timestamp := clientmodel.Now()
defer func(start time.Time) {
ms := float64(time.Since(start)) / float64(time.Millisecond)
labels := prometheus.Labels{
@ -202,11 +238,16 @@ func (t *target) scrape(timestamp clientmodel.Timestamp, ingester extraction.Ing
instance: t.Address(),
outcome: success,
}
if err != nil {
if err == nil {
t.state = ALIVE
t.recordScrapeHealth(ingester, timestamp, true)
labels[outcome] = failure
} else {
t.state = UNREACHABLE
t.recordScrapeHealth(ingester, timestamp, false)
}
targetOperationLatencies.With(labels).Observe(ms)
t.lastError = err
}(time.Now())
req, err := http.NewRequest("GET", t.Address(), nil)
@ -292,7 +333,3 @@ func (t *target) Merge(newTarget Target) {
}
type targets []Target
func (t targets) Len() int {
return len(t)
}

View file

@ -42,7 +42,7 @@ func TestTargetScrapeUpdatesState(t *testing.T) {
address: "bad schema",
httpClient: utility.NewDeadlineClient(0),
}
testTarget.Scrape(nopIngester{})
testTarget.scrape(nopIngester{})
if testTarget.state != UNREACHABLE {
t.Errorf("Expected target state %v, actual: %v", UNREACHABLE, testTarget.state)
}
@ -100,7 +100,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
// scrape once without timeout
signal <- true
if err := testTarget.Scrape(ingester); err != nil {
if err := testTarget.scrape(ingester); err != nil {
t.Fatal(err)
}
@ -109,12 +109,12 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again
signal <- true
if err := testTarget.Scrape(ingester); err != nil {
if err := testTarget.scrape(ingester); err != nil {
t.Fatal(err)
}
// now timeout
if err := testTarget.Scrape(ingester); err == nil {
if err := testTarget.scrape(ingester); err == nil {
t.Fatal("expected scrape to timeout")
} else {
signal <- true // let handler continue
@ -122,7 +122,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again without timeout
signal <- true
if err := testTarget.Scrape(ingester); err != nil {
if err := testTarget.scrape(ingester); err != nil {
t.Fatal(err)
}
}
@ -138,8 +138,34 @@ func TestTargetScrape404(t *testing.T) {
ingester := nopIngester{}
want := errors.New("server returned HTTP status 404 Not Found")
got := testTarget.Scrape(ingester)
got := testTarget.scrape(ingester)
if got == nil || want.Error() != got.Error() {
t.Fatalf("want err %q, got %q", want, got)
}
}
func TestTargetRunScraperScrapes(t *testing.T) {
testTarget := target{
state: UNKNOWN,
address: "bad schema",
httpClient: utility.NewDeadlineClient(0),
stopScraper: make(chan bool, 1),
}
go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond))
// Enough time for a scrape to happen.
time.Sleep(2 * time.Millisecond)
if testTarget.lastScrape.IsZero() {
t.Errorf("Scrape hasn't occured.")
}
testTarget.StopScraper()
// Wait for it to take effect.
time.Sleep(2 * time.Millisecond)
last := testTarget.lastScrape
// Enough time for a scrape to happen.
time.Sleep(2 * time.Millisecond)
if testTarget.lastScrape != last {
t.Errorf("Scrape occured after it was stopped.")
}
}

View file

@ -23,8 +23,6 @@ import (
)
type TargetManager interface {
acquire()
release()
AddTarget(job config.JobConfig, t Target)
ReplaceTargets(job config.JobConfig, newTargets []Target)
Remove(t Target)
@ -34,27 +32,17 @@ type TargetManager interface {
}
type targetManager struct {
requestAllowance chan bool
poolsByJob map[string]*TargetPool
ingester extraction.Ingester
poolsByJob map[string]*TargetPool
ingester extraction.Ingester
}
func NewTargetManager(ingester extraction.Ingester, requestAllowance int) TargetManager {
func NewTargetManager(ingester extraction.Ingester) TargetManager {
return &targetManager{
requestAllowance: make(chan bool, requestAllowance),
ingester: ingester,
poolsByJob: make(map[string]*TargetPool),
ingester: ingester,
poolsByJob: make(map[string]*TargetPool),
}
}
func (m *targetManager) acquire() {
m.requestAllowance <- true
}
func (m *targetManager) release() {
<-m.requestAllowance
}
func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool {
targetPool, ok := m.poolsByJob[job.GetName()]
@ -64,13 +52,13 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool {
provider = NewSdTargetProvider(job)
}
targetPool = NewTargetPool(m, provider)
interval := job.ScrapeInterval()
targetPool = NewTargetPool(m, provider, m.ingester, interval)
glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName())
interval := job.ScrapeInterval()
m.poolsByJob[job.GetName()] = targetPool
// BUG(all): Investigate whether this auto-goroutine creation is desired.
go targetPool.Run(m.ingester, interval)
go targetPool.Run()
}
return targetPool
@ -84,7 +72,7 @@ func (m *targetManager) AddTarget(job config.JobConfig, t Target) {
func (m *targetManager) ReplaceTargets(job config.JobConfig, newTargets []Target) {
targetPool := m.TargetPoolForJob(job)
targetPool.replaceTargets(newTargets)
targetPool.ReplaceTargets(newTargets)
}
func (m targetManager) Remove(t Target) {

View file

@ -29,10 +29,9 @@ import (
)
type fakeTarget struct {
scrapeCount int
schedules []time.Time
interval time.Duration
scheduleIndex int
scrapeCount int
lastScrape time.Time
interval time.Duration
}
func (t fakeTarget) LastError() error {
@ -55,33 +54,32 @@ func (t fakeTarget) Interval() time.Duration {
return t.interval
}
func (t *fakeTarget) Scrape(i extraction.Ingester) error {
func (t fakeTarget) LastScrape() time.Time {
return t.lastScrape
}
func (t fakeTarget) scrape(i extraction.Ingester) error {
t.scrapeCount++
return nil
}
func (t fakeTarget) RunScraper(ingester extraction.Ingester, interval time.Duration) {
return
}
func (t fakeTarget) StopScraper() {
return
}
func (t fakeTarget) State() TargetState {
return ALIVE
}
func (t fakeTarget) LastScrape() time.Time {
return time.Now()
}
func (t *fakeTarget) ScheduledFor() (time time.Time) {
time = t.schedules[t.scheduleIndex]
t.scheduleIndex++
return
}
func (t *fakeTarget) Merge(newTarget Target) {}
func (t *fakeTarget) EstimatedTimeToExecute() time.Duration { return 0 }
func testTargetManager(t testing.TB) {
targetManager := NewTargetManager(nopIngester{}, 3)
targetManager := NewTargetManager(nopIngester{})
testJob1 := config.JobConfig{
JobConfig: pb.JobConfig{
Name: proto.String("test_job1"),
@ -96,20 +94,17 @@ func testTargetManager(t testing.TB) {
}
target1GroupA := &fakeTarget{
schedules: []time.Time{time.Now()},
interval: time.Minute,
interval: time.Minute,
}
target2GroupA := &fakeTarget{
schedules: []time.Time{time.Now()},
interval: time.Minute,
interval: time.Minute,
}
targetManager.AddTarget(testJob1, target1GroupA)
targetManager.AddTarget(testJob1, target2GroupA)
target1GroupB := &fakeTarget{
schedules: []time.Time{time.Now()},
interval: time.Minute * 2,
interval: time.Minute * 2,
}
targetManager.AddTarget(testJob2, target1GroupB)

View file

@ -19,75 +19,69 @@ import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/utility"
)
const (
targetAddQueueSize = 100
targetReplaceQueueSize = 1
intervalKey = "interval"
)
var (
retrievalDurations = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "targetpool_retrieve_time_milliseconds",
Help: "The time needed for each TargetPool to retrieve state from all included entities.",
Objectives: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
},
[]string{intervalKey},
)
)
func init() {
prometheus.MustRegister(retrievalDurations)
}
type TargetPool struct {
sync.RWMutex
done chan bool
manager TargetManager
targets targets
addTargetQueue chan Target
replaceTargetsQueue chan targets
done chan chan bool
manager TargetManager
targetsByAddress map[string]Target
interval time.Duration
ingester extraction.Ingester
addTargetQueue chan Target
targetProvider TargetProvider
}
func NewTargetPool(m TargetManager, p TargetProvider) *TargetPool {
func NewTargetPool(m TargetManager, p TargetProvider, ing extraction.Ingester, i time.Duration) *TargetPool {
return &TargetPool{
manager: m,
addTargetQueue: make(chan Target, targetAddQueueSize),
replaceTargetsQueue: make(chan targets, targetReplaceQueueSize),
targetProvider: p,
done: make(chan bool),
manager: m,
interval: i,
ingester: ing,
targetsByAddress: make(map[string]Target),
addTargetQueue: make(chan Target, targetAddQueueSize),
targetProvider: p,
done: make(chan chan bool),
}
}
func (p *TargetPool) Run(ingester extraction.Ingester, interval time.Duration) {
ticker := time.NewTicker(interval)
func (p *TargetPool) Run() {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.runIteration(ingester, interval)
if p.targetProvider != nil {
targets, err := p.targetProvider.Targets()
if err != nil {
glog.Warningf("Error looking up targets, keeping old list: %s", err)
} else {
p.ReplaceTargets(targets)
}
}
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case newTargets := <-p.replaceTargetsQueue:
p.replaceTargets(newTargets)
case <-p.done:
case stopped := <-p.done:
p.ReplaceTargets([]Target{})
glog.Info("TargetPool exiting...")
stopped <- true
return
}
}
}
func (p TargetPool) Stop() {
p.done <- true
stopped := make(chan bool)
p.done <- stopped
<-stopped
}
func (p *TargetPool) AddTarget(target Target) {
@ -98,85 +92,45 @@ func (p *TargetPool) addTarget(target Target) {
p.Lock()
defer p.Unlock()
p.targets = append(p.targets, target)
p.targetsByAddress[target.Address()] = target
go target.RunScraper(p.ingester, p.interval)
}
func (p *TargetPool) ReplaceTargets(newTargets []Target) {
p.Lock()
defer p.Unlock()
// If there is anything remaining in the queue for effectuation, clear it out,
// because the last mutation should win.
select {
case <-p.replaceTargetsQueue:
default:
p.replaceTargetsQueue <- newTargets
}
}
func (p *TargetPool) replaceTargets(newTargets []Target) {
p.Lock()
defer p.Unlock()
// Replace old target list by new one, but reuse those targets from the old
// list of targets which are also in the new list (to preserve scheduling and
// health state).
for j, newTarget := range newTargets {
for _, oldTarget := range p.targets {
if oldTarget.Address() == newTarget.Address() {
oldTarget.Merge(newTargets[j])
newTargets[j] = oldTarget
}
}
}
p.targets = newTargets
}
func (p *TargetPool) runSingle(ingester extraction.Ingester, t Target) {
p.manager.acquire()
defer p.manager.release()
t.Scrape(ingester)
}
func (p *TargetPool) runIteration(ingester extraction.Ingester, interval time.Duration) {
if p.targetProvider != nil {
targets, err := p.targetProvider.Targets()
if err != nil {
glog.Warningf("Error looking up targets, keeping old list: %s", err)
newTargetAddresses := make(utility.Set)
for _, newTarget := range newTargets {
newTargetAddresses.Add(newTarget.Address())
oldTarget, ok := p.targetsByAddress[newTarget.Address()]
if ok {
oldTarget.Merge(newTarget)
} else {
p.ReplaceTargets(targets)
p.targetsByAddress[newTarget.Address()] = newTarget
go newTarget.RunScraper(p.ingester, p.interval)
}
}
p.RLock()
defer p.RUnlock()
begin := time.Now()
wait := sync.WaitGroup{}
for _, target := range p.targets {
wait.Add(1)
go func(t Target) {
p.runSingle(ingester, t)
wait.Done()
}(target)
// Stop any targets no longer present.
for k, oldTarget := range p.targetsByAddress {
if !newTargetAddresses.Has(k) {
glog.V(1).Info("Stopping scraper for target ", k)
oldTarget.StopScraper()
delete(p.targetsByAddress, k)
}
}
wait.Wait()
duration := float64(time.Since(begin) / time.Millisecond)
retrievalDurations.WithLabelValues(interval.String()).Observe(duration)
}
func (p *TargetPool) Targets() []Target {
p.RLock()
defer p.RUnlock()
targets := make([]Target, len(p.targets))
copy(targets, p.targets)
targets := make([]Target, 0, len(p.targetsByAddress))
for _, v := range p.targetsByAddress {
targets = append(targets, v)
}
return targets
}

View file

@ -77,7 +77,7 @@ func testTargetPool(t testing.TB) {
}
for i, scenario := range scenarios {
pool := TargetPool{}
pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1))
for _, input := range scenario.inputs {
target := target{
@ -87,11 +87,11 @@ func testTargetPool(t testing.TB) {
pool.addTarget(&target)
}
if pool.targets.Len() != len(scenario.outputs) {
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.targets.Len())
if len(pool.targetsByAddress) != len(scenario.outputs) {
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), len(pool.targetsByAddress))
} else {
for j, output := range scenario.outputs {
target := pool.targets[j]
target := pool.Targets()[j]
if target.Address() != output.address {
t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address())
@ -99,8 +99,8 @@ func testTargetPool(t testing.TB) {
}
}
if pool.targets.Len() != len(scenario.outputs) {
t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), pool.targets.Len())
if len(pool.targetsByAddress) != len(scenario.outputs) {
t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), len(pool.targetsByAddress))
}
}
}
@ -111,41 +111,48 @@ func TestTargetPool(t *testing.T) {
}
func TestTargetPoolReplaceTargets(t *testing.T) {
pool := TargetPool{}
pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1))
oldTarget1 := &target{
address: "http://example1.com/metrics.json",
state: UNREACHABLE,
address: "example1",
state: UNREACHABLE,
stopScraper: make(chan bool, 1),
}
oldTarget2 := &target{
address: "http://example2.com/metrics.json",
state: UNREACHABLE,
address: "example2",
state: UNREACHABLE,
stopScraper: make(chan bool, 1),
}
newTarget1 := &target{
address: "http://example1.com/metrics.json",
state: ALIVE,
address: "example1",
state: ALIVE,
stopScraper: make(chan bool, 1),
}
newTarget2 := &target{
address: "http://example3.com/metrics.json",
state: ALIVE,
address: "example3",
state: ALIVE,
stopScraper: make(chan bool, 1),
}
oldTarget1.StopScraper()
oldTarget2.StopScraper()
newTarget2.StopScraper()
pool.addTarget(oldTarget1)
pool.addTarget(oldTarget2)
pool.replaceTargets([]Target{newTarget1, newTarget2})
pool.ReplaceTargets([]Target{newTarget1, newTarget2})
if pool.targets.Len() != 2 {
t.Errorf("Expected 2 elements in pool, had %d", pool.targets.Len())
if len(pool.targetsByAddress) != 2 {
t.Errorf("Expected 2 elements in pool, had %d", len(pool.targetsByAddress))
}
target1 := pool.targets[0].(*target)
if target1.state != oldTarget1.state {
t.Errorf("Wrong first target returned from pool, expected %v, got %v", oldTarget1, target1)
if pool.targetsByAddress["example1"].State() != oldTarget1.State() {
t.Errorf("target1 channel has changed")
}
target2 := pool.targets[1].(*target)
if target2.state != newTarget2.state {
t.Errorf("Wrong second target returned from pool, expected %v, got %v", newTarget2, target2)
if pool.targetsByAddress["example3"].State() == oldTarget2.State() {
t.Errorf("newTarget2 channel same as oldTarget2's")
}
}
func BenchmarkTargetPool(b *testing.B) {