mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-13 17:14:05 -08:00
Merge pull request #86 from prometheus/julius-sd
Allow replacing job targets via HTTP API.
This commit is contained in:
commit
ca57b43e8c
|
@ -58,7 +58,7 @@ func (config *Config) AddJob(options map[string]string, targets []Targets) error
|
||||||
if len(targets) == 0 {
|
if len(targets) == 0 {
|
||||||
return fmt.Errorf("No targets configured for job '%v'", name)
|
return fmt.Errorf("No targets configured for job '%v'", name)
|
||||||
}
|
}
|
||||||
job := &JobConfig{
|
job := JobConfig{
|
||||||
Targets: tmpJobTargets,
|
Targets: tmpJobTargets,
|
||||||
}
|
}
|
||||||
for option, value := range options {
|
for option, value := range options {
|
||||||
|
@ -66,10 +66,20 @@ func (config *Config) AddJob(options map[string]string, targets []Targets) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
config.Jobs = append(config.Jobs, *job)
|
config.Jobs = append(config.Jobs, job)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (config *Config) GetJobByName(name string) (jobConfig *JobConfig) {
|
||||||
|
for _, job := range config.Jobs {
|
||||||
|
if job.Name == name {
|
||||||
|
jobConfig = &job
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (config *GlobalConfig) SetOption(option string, value string) (err error) {
|
func (config *GlobalConfig) SetOption(option string, value string) (err error) {
|
||||||
switch option {
|
switch option {
|
||||||
case "scrape_interval":
|
case "scrape_interval":
|
||||||
|
|
|
@ -79,8 +79,12 @@ type Target interface {
|
||||||
// points in this interface, this one is the best candidate to change given
|
// points in this interface, this one is the best candidate to change given
|
||||||
// the ways to express the endpoint.
|
// the ways to express the endpoint.
|
||||||
Address() string
|
Address() string
|
||||||
// How frequently queries occur.
|
// Return the target's base labels.
|
||||||
Interval() time.Duration
|
BaseLabels() model.LabelSet
|
||||||
|
// Merge a new externally supplied target definition (e.g. with changed base
|
||||||
|
// labels) into an old target definition for the same endpoint. Preserve
|
||||||
|
// remaining information - like health state - from the old target.
|
||||||
|
Merge(newTarget Target)
|
||||||
}
|
}
|
||||||
|
|
||||||
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
|
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
|
||||||
|
@ -94,19 +98,15 @@ type target struct {
|
||||||
// What is the deadline for the HTTP or HTTPS against this endpoint.
|
// What is the deadline for the HTTP or HTTPS against this endpoint.
|
||||||
Deadline time.Duration
|
Deadline time.Duration
|
||||||
// Any base labels that are added to this target and its metrics.
|
// Any base labels that are added to this target and its metrics.
|
||||||
BaseLabels model.LabelSet
|
baseLabels model.LabelSet
|
||||||
|
|
||||||
// XXX: Move this to a field with the target manager initialization instead of here.
|
|
||||||
interval time.Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Furnish a reasonably configured target for querying.
|
// Furnish a reasonably configured target for querying.
|
||||||
func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) Target {
|
func NewTarget(address string, deadline time.Duration, baseLabels model.LabelSet) Target {
|
||||||
target := &target{
|
target := &target{
|
||||||
address: address,
|
address: address,
|
||||||
Deadline: deadline,
|
Deadline: deadline,
|
||||||
interval: interval,
|
baseLabels: baseLabels,
|
||||||
BaseLabels: baseLabels,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduler := &healthScheduler{
|
scheduler := &healthScheduler{
|
||||||
|
@ -155,7 +155,7 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err
|
||||||
// XXX: This is a wart; we need to handle this more gracefully down the
|
// XXX: This is a wart; we need to handle this more gracefully down the
|
||||||
// road, especially once we have service discovery support.
|
// road, especially once we have service discovery support.
|
||||||
baseLabels := model.LabelSet{instance: model.LabelValue(t.Address())}
|
baseLabels := model.LabelSet{instance: model.LabelValue(t.Address())}
|
||||||
for baseLabel, baseValue := range t.BaseLabels {
|
for baseLabel, baseValue := range t.baseLabels {
|
||||||
baseLabels[baseLabel] = baseValue
|
baseLabels[baseLabel] = baseValue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,6 +200,16 @@ func (t target) Address() string {
|
||||||
return t.address
|
return t.address
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t target) Interval() time.Duration {
|
func (t target) BaseLabels() model.LabelSet {
|
||||||
return t.interval
|
return t.baseLabels
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge a new externally supplied target definition (e.g. with changed base
|
||||||
|
// labels) into an old target definition for the same endpoint. Preserve
|
||||||
|
// remaining information - like health state - from the old target.
|
||||||
|
func (t *target) Merge(newTarget Target) {
|
||||||
|
if t.Address() != newTarget.Address() {
|
||||||
|
panic("targets don't refer to the same endpoint")
|
||||||
|
}
|
||||||
|
t.baseLabels = newTarget.BaseLabels()
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,6 @@
|
||||||
package retrieval
|
package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/model"
|
"github.com/prometheus/prometheus/model"
|
||||||
"github.com/prometheus/prometheus/retrieval/format"
|
"github.com/prometheus/prometheus/retrieval/format"
|
||||||
|
@ -25,14 +24,16 @@ import (
|
||||||
type TargetManager interface {
|
type TargetManager interface {
|
||||||
acquire()
|
acquire()
|
||||||
release()
|
release()
|
||||||
Add(t Target)
|
AddTarget(job *config.JobConfig, t Target, defaultScrapeInterval time.Duration)
|
||||||
|
ReplaceTargets(job *config.JobConfig, newTargets []Target, defaultScrapeInterval time.Duration)
|
||||||
Remove(t Target)
|
Remove(t Target)
|
||||||
AddTargetsFromConfig(config *config.Config)
|
AddTargetsFromConfig(config *config.Config)
|
||||||
|
Pools() map[string]*TargetPool
|
||||||
}
|
}
|
||||||
|
|
||||||
type targetManager struct {
|
type targetManager struct {
|
||||||
requestAllowance chan bool
|
requestAllowance chan bool
|
||||||
pools map[time.Duration]*TargetPool
|
poolsByJob map[string]*TargetPool
|
||||||
results chan format.Result
|
results chan format.Result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +41,7 @@ func NewTargetManager(results chan format.Result, requestAllowance int) TargetMa
|
||||||
return &targetManager{
|
return &targetManager{
|
||||||
requestAllowance: make(chan bool, requestAllowance),
|
requestAllowance: make(chan bool, requestAllowance),
|
||||||
results: results,
|
results: results,
|
||||||
pools: make(map[time.Duration]*TargetPool),
|
poolsByJob: make(map[string]*TargetPool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,17 +53,32 @@ func (m *targetManager) release() {
|
||||||
<-m.requestAllowance
|
<-m.requestAllowance
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *targetManager) Add(t Target) {
|
func (m *targetManager) TargetPoolForJob(job *config.JobConfig, defaultScrapeInterval time.Duration) (targetPool *TargetPool) {
|
||||||
targetPool, ok := m.pools[t.Interval()]
|
targetPool, ok := m.poolsByJob[job.Name]
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
targetPool = NewTargetPool(m)
|
targetPool = NewTargetPool(m)
|
||||||
log.Printf("Pool %s does not exist; creating and starting...", t.Interval())
|
log.Printf("Pool for job %s does not exist; creating and starting...", job.Name)
|
||||||
go targetPool.Run(m.results, t.Interval())
|
|
||||||
}
|
|
||||||
|
|
||||||
heap.Push(targetPool, t)
|
interval := job.ScrapeInterval
|
||||||
m.pools[t.Interval()] = targetPool
|
if interval == 0 {
|
||||||
|
interval = defaultScrapeInterval
|
||||||
|
}
|
||||||
|
m.poolsByJob[job.Name] = targetPool
|
||||||
|
go targetPool.Run(m.results, interval)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *targetManager) AddTarget(job *config.JobConfig, t Target, defaultScrapeInterval time.Duration) {
|
||||||
|
targetPool := m.TargetPoolForJob(job, defaultScrapeInterval)
|
||||||
|
targetPool.AddTarget(t)
|
||||||
|
m.poolsByJob[job.Name] = targetPool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *targetManager) ReplaceTargets(job *config.JobConfig, newTargets []Target, defaultScrapeInterval time.Duration) {
|
||||||
|
targetPool := m.TargetPoolForJob(job, defaultScrapeInterval)
|
||||||
|
targetPool.replaceTargets(newTargets)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m targetManager) Remove(t Target) {
|
func (m targetManager) Remove(t Target) {
|
||||||
|
@ -79,15 +95,15 @@ func (m *targetManager) AddTargetsFromConfig(config *config.Config) {
|
||||||
baseLabels[label] = value
|
baseLabels[label] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
interval := job.ScrapeInterval
|
|
||||||
if interval == 0 {
|
|
||||||
interval = config.Global.ScrapeInterval
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, endpoint := range configTargets.Endpoints {
|
for _, endpoint := range configTargets.Endpoints {
|
||||||
target := NewTarget(endpoint, interval, time.Second*5, baseLabels)
|
target := NewTarget(endpoint, time.Second*5, baseLabels)
|
||||||
m.Add(target)
|
m.AddTarget(&job, target, config.Global.ScrapeInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// XXX: Not really thread-safe. Only used in /status page for now.
|
||||||
|
func (m *targetManager) Pools() map[string]*TargetPool {
|
||||||
|
return m.poolsByJob
|
||||||
|
}
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
package retrieval
|
package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/model"
|
||||||
"github.com/prometheus/prometheus/retrieval/format"
|
"github.com/prometheus/prometheus/retrieval/format"
|
||||||
"github.com/prometheus/prometheus/utility/test"
|
"github.com/prometheus/prometheus/utility/test"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -31,6 +33,10 @@ func (t fakeTarget) Address() string {
|
||||||
return "fake"
|
return "fake"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t fakeTarget) BaseLabels() model.LabelSet {
|
||||||
|
return model.LabelSet{}
|
||||||
|
}
|
||||||
|
|
||||||
func (t fakeTarget) Interval() time.Duration {
|
func (t fakeTarget) Interval() time.Duration {
|
||||||
return t.interval
|
return t.interval
|
||||||
}
|
}
|
||||||
|
@ -52,9 +58,17 @@ func (t *fakeTarget) scheduledFor() (time time.Time) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *fakeTarget) Merge(newTarget Target) {}
|
||||||
|
|
||||||
func testTargetManager(t test.Tester) {
|
func testTargetManager(t test.Tester) {
|
||||||
results := make(chan format.Result, 5)
|
results := make(chan format.Result, 5)
|
||||||
targetManager := NewTargetManager(results, 3)
|
targetManager := NewTargetManager(results, 3)
|
||||||
|
testJob1 := &config.JobConfig{
|
||||||
|
Name: "test_job1",
|
||||||
|
}
|
||||||
|
testJob2 := &config.JobConfig{
|
||||||
|
Name: "test_job2",
|
||||||
|
}
|
||||||
|
|
||||||
target1GroupA := &fakeTarget{
|
target1GroupA := &fakeTarget{
|
||||||
schedules: []time.Time{time.Now()},
|
schedules: []time.Time{time.Now()},
|
||||||
|
@ -65,16 +79,15 @@ func testTargetManager(t test.Tester) {
|
||||||
interval: time.Minute,
|
interval: time.Minute,
|
||||||
}
|
}
|
||||||
|
|
||||||
targetManager.Add(target1GroupA)
|
targetManager.AddTarget(testJob1, target1GroupA, 0)
|
||||||
targetManager.Add(target2GroupA)
|
targetManager.AddTarget(testJob1, target2GroupA, 0)
|
||||||
|
|
||||||
target1GroupB := &fakeTarget{
|
target1GroupB := &fakeTarget{
|
||||||
schedules: []time.Time{time.Now()},
|
schedules: []time.Time{time.Now()},
|
||||||
interval: time.Minute * 2,
|
interval: time.Minute * 2,
|
||||||
}
|
}
|
||||||
|
|
||||||
targetManager.Add(target1GroupB)
|
targetManager.AddTarget(testJob2, target1GroupB, 0)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTargetManager(t *testing.T) {
|
func TestTargetManager(t *testing.T) {
|
||||||
|
|
|
@ -1,9 +1,22 @@
|
||||||
|
// Copyright 2013 Prometheus Team
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
package retrieval
|
package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
|
||||||
"github.com/prometheus/prometheus/retrieval/format"
|
"github.com/prometheus/prometheus/retrieval/format"
|
||||||
"log"
|
"log"
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,11 +28,15 @@ type TargetPool struct {
|
||||||
done chan bool
|
done chan bool
|
||||||
manager TargetManager
|
manager TargetManager
|
||||||
targets []Target
|
targets []Target
|
||||||
|
addTargetQueue chan Target
|
||||||
|
replaceTargetsQueue chan []Target
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTargetPool(m TargetManager) (p *TargetPool) {
|
func NewTargetPool(m TargetManager) (p *TargetPool) {
|
||||||
return &TargetPool{
|
return &TargetPool{
|
||||||
manager: m,
|
manager: m,
|
||||||
|
addTargetQueue: make(chan Target),
|
||||||
|
replaceTargetsQueue: make(chan []Target),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,20 +48,6 @@ func (p TargetPool) Less(i, j int) bool {
|
||||||
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
|
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *TargetPool) Pop() interface{} {
|
|
||||||
oldPool := p.targets
|
|
||||||
futureLength := p.Len() - 1
|
|
||||||
element := oldPool[futureLength]
|
|
||||||
futurePool := oldPool[0:futureLength]
|
|
||||||
p.targets = futurePool
|
|
||||||
|
|
||||||
return element
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *TargetPool) Push(element interface{}) {
|
|
||||||
p.targets = append(p.targets, element.(Target))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p TargetPool) Swap(i, j int) {
|
func (p TargetPool) Swap(i, j int) {
|
||||||
p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
|
p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
|
||||||
}
|
}
|
||||||
|
@ -56,6 +59,10 @@ func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
|
||||||
select {
|
select {
|
||||||
case <-ticker:
|
case <-ticker:
|
||||||
p.runIteration(results, interval)
|
p.runIteration(results, interval)
|
||||||
|
case newTarget := <-p.addTargetQueue:
|
||||||
|
p.addTarget(newTarget)
|
||||||
|
case newTargets := <-p.replaceTargetsQueue:
|
||||||
|
p.replaceTargets(newTargets)
|
||||||
case <-p.done:
|
case <-p.done:
|
||||||
log.Printf("TargetPool exiting...")
|
log.Printf("TargetPool exiting...")
|
||||||
break
|
break
|
||||||
|
@ -67,6 +74,33 @@ func (p TargetPool) Stop() {
|
||||||
p.done <- true
|
p.done <- true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *TargetPool) AddTarget(target Target) {
|
||||||
|
p.addTargetQueue <- target
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *TargetPool) addTarget(target Target) {
|
||||||
|
p.targets = append(p.targets, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *TargetPool) ReplaceTargets(newTargets []Target) {
|
||||||
|
p.replaceTargetsQueue <- newTargets
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *TargetPool) replaceTargets(newTargets []Target) {
|
||||||
|
// Replace old target list by new one, but reuse those targets from the old
|
||||||
|
// list of targets which are also in the new list (to preserve scheduling and
|
||||||
|
// health state).
|
||||||
|
for j, newTarget := range newTargets {
|
||||||
|
for _, oldTarget := range p.targets {
|
||||||
|
if oldTarget.Address() == newTarget.Address() {
|
||||||
|
oldTarget.Merge(newTargets[j])
|
||||||
|
newTargets[j] = oldTarget
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p.targets = newTargets
|
||||||
|
}
|
||||||
|
|
||||||
func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) {
|
func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) {
|
||||||
p.manager.acquire()
|
p.manager.acquire()
|
||||||
defer p.manager.release()
|
defer p.manager.release()
|
||||||
|
@ -80,33 +114,35 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura
|
||||||
targetCount := p.Len()
|
targetCount := p.Len()
|
||||||
finished := make(chan bool, targetCount)
|
finished := make(chan bool, targetCount)
|
||||||
|
|
||||||
for i := 0; i < targetCount; i++ {
|
// Sort p.targets by next scheduling time so we can process the earliest
|
||||||
target := heap.Pop(p).(Target)
|
// targets first.
|
||||||
if target == nil {
|
sort.Sort(p)
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
|
for _, target := range p.targets {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
if target.scheduledFor().After(now) {
|
if target.scheduledFor().After(now) {
|
||||||
heap.Push(p, target)
|
|
||||||
// None of the remaining targets are ready to be scheduled. Signal that
|
// None of the remaining targets are ready to be scheduled. Signal that
|
||||||
// we're done processing them in this scrape iteration.
|
// we're done processing them in this scrape iteration.
|
||||||
for j := i; j < targetCount; j++ {
|
|
||||||
finished <- true
|
finished <- true
|
||||||
}
|
continue
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func(t Target) {
|
||||||
p.runSingle(now, results, target)
|
p.runSingle(now, results, t)
|
||||||
heap.Push(p, target)
|
|
||||||
finished <- true
|
finished <- true
|
||||||
}()
|
}(target)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < targetCount; i++ {
|
for i := 0; i < targetCount; {
|
||||||
<-finished
|
select {
|
||||||
|
case <-finished:
|
||||||
|
i++
|
||||||
|
case newTarget := <-p.addTargetQueue:
|
||||||
|
p.addTarget(newTarget)
|
||||||
|
case newTargets := <-p.replaceTargetsQueue:
|
||||||
|
p.replaceTargets(newTargets)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
close(finished)
|
close(finished)
|
||||||
|
@ -114,3 +150,8 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura
|
||||||
duration := float64(time.Since(begin) / time.Millisecond)
|
duration := float64(time.Since(begin) / time.Millisecond)
|
||||||
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
|
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// XXX: Not really thread-safe. Only used in /status page for now.
|
||||||
|
func (p *TargetPool) Targets() []Target {
|
||||||
|
return p.targets
|
||||||
|
}
|
||||||
|
|
|
@ -14,9 +14,9 @@
|
||||||
package retrieval
|
package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
|
||||||
"github.com/prometheus/prometheus/retrieval/format"
|
"github.com/prometheus/prometheus/retrieval/format"
|
||||||
"github.com/prometheus/prometheus/utility/test"
|
"github.com/prometheus/prometheus/utility/test"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -111,34 +111,20 @@ func testTargetPool(t test.Tester) {
|
||||||
scheduler: literalScheduler(input.scheduledFor),
|
scheduler: literalScheduler(input.scheduledFor),
|
||||||
}
|
}
|
||||||
|
|
||||||
heap.Push(&pool, &target)
|
pool.addTarget(&target)
|
||||||
}
|
}
|
||||||
|
sort.Sort(pool)
|
||||||
targets := []Target{}
|
|
||||||
|
|
||||||
if pool.Len() != len(scenario.outputs) {
|
if pool.Len() != len(scenario.outputs) {
|
||||||
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len())
|
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len())
|
||||||
} else {
|
} else {
|
||||||
for j, output := range scenario.outputs {
|
for j, output := range scenario.outputs {
|
||||||
target := heap.Pop(&pool).(Target)
|
target := pool.targets[j]
|
||||||
|
|
||||||
if target.Address() != output.address {
|
if target.Address() != output.address {
|
||||||
t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address())
|
t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address())
|
||||||
|
|
||||||
}
|
}
|
||||||
targets = append(targets, target)
|
|
||||||
}
|
|
||||||
|
|
||||||
if pool.Len() != 0 {
|
|
||||||
t.Errorf("%s %d. expected pool to be empty, had %d", scenario.name, i, pool.Len())
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(targets) != len(scenario.outputs) {
|
|
||||||
t.Errorf("%s %d. expected to receive %d elements, got %d", scenario.name, i, len(scenario.outputs), len(targets))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, target := range targets {
|
|
||||||
heap.Push(&pool, target)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if pool.Len() != len(scenario.outputs) {
|
if pool.Len() != len(scenario.outputs) {
|
||||||
|
@ -158,7 +144,7 @@ func TestTargetPoolIterationWithUnhealthyTargetsFinishes(t *testing.T) {
|
||||||
address: "http://example.com/metrics.json",
|
address: "http://example.com/metrics.json",
|
||||||
scheduler: literalScheduler(time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)),
|
scheduler: literalScheduler(time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)),
|
||||||
}
|
}
|
||||||
pool.Push(target)
|
pool.addTarget(target)
|
||||||
|
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -174,6 +160,49 @@ func TestTargetPoolIterationWithUnhealthyTargetsFinishes(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTargetPoolReplaceTargets(t *testing.T) {
|
||||||
|
pool := TargetPool{}
|
||||||
|
oldTarget1 := &target{
|
||||||
|
address: "http://example1.com/metrics.json",
|
||||||
|
scheduler: literalScheduler(time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)),
|
||||||
|
state: UNREACHABLE,
|
||||||
|
}
|
||||||
|
oldTarget2 := &target{
|
||||||
|
address: "http://example2.com/metrics.json",
|
||||||
|
scheduler: literalScheduler(time.Date(7500, 1, 1, 0, 0, 0, 0, time.UTC)),
|
||||||
|
state: UNREACHABLE,
|
||||||
|
}
|
||||||
|
newTarget1 := &target{
|
||||||
|
address: "http://example1.com/metrics.json",
|
||||||
|
scheduler: literalScheduler(time.Date(5000, 1, 1, 0, 0, 0, 0, time.UTC)),
|
||||||
|
state: ALIVE,
|
||||||
|
}
|
||||||
|
newTarget2 := &target{
|
||||||
|
address: "http://example3.com/metrics.json",
|
||||||
|
scheduler: literalScheduler(time.Date(2500, 1, 1, 0, 0, 0, 0, time.UTC)),
|
||||||
|
state: ALIVE,
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.addTarget(oldTarget1)
|
||||||
|
pool.addTarget(oldTarget2)
|
||||||
|
|
||||||
|
pool.replaceTargets([]Target{newTarget1, newTarget2})
|
||||||
|
sort.Sort(pool)
|
||||||
|
|
||||||
|
if pool.Len() != 2 {
|
||||||
|
t.Errorf("Expected 2 elements in pool, had %d", pool.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
target1 := pool.targets[0].(*target)
|
||||||
|
if target1.state != newTarget1.state {
|
||||||
|
t.Errorf("Wrong first target returned from pool, expected %v, got %v", newTarget2, target1)
|
||||||
|
}
|
||||||
|
target2 := pool.targets[1].(*target)
|
||||||
|
if target2.state != oldTarget1.state {
|
||||||
|
t.Errorf("Wrong second target returned from pool, expected %v, got %v", oldTarget1, target2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkTargetPool(b *testing.B) {
|
func BenchmarkTargetPool(b *testing.B) {
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
testTargetPool(b)
|
testTargetPool(b)
|
||||||
|
|
|
@ -1,8 +1,21 @@
|
||||||
|
// Copyright 2013 Prometheus Team
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"code.google.com/p/gorest"
|
"code.google.com/p/gorest"
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
"github.com/prometheus/prometheus/appstate"
|
||||||
"github.com/prometheus/prometheus/utility"
|
"github.com/prometheus/prometheus/utility"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -13,12 +26,14 @@ type MetricsService struct {
|
||||||
queryRange gorest.EndPoint `method:"GET" path:"/query_range?{expr:string}&{end:int64}&{range:int64}&{step:int64}" output:"string"`
|
queryRange gorest.EndPoint `method:"GET" path:"/query_range?{expr:string}&{end:int64}&{range:int64}&{step:int64}" output:"string"`
|
||||||
metrics gorest.EndPoint `method:"GET" path:"/metrics" output:"string"`
|
metrics gorest.EndPoint `method:"GET" path:"/metrics" output:"string"`
|
||||||
|
|
||||||
persistence metric.MetricPersistence
|
setTargets gorest.EndPoint `method:"PUT" path:"/jobs/{jobName:string}/targets" postdata:"[]TargetGroup"`
|
||||||
|
|
||||||
|
appState *appstate.ApplicationState
|
||||||
time utility.Time
|
time utility.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMetricsService(persistence metric.MetricPersistence) *MetricsService {
|
func NewMetricsService(appState *appstate.ApplicationState) *MetricsService {
|
||||||
return &MetricsService{
|
return &MetricsService{
|
||||||
persistence: persistence,
|
appState: appState,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,16 @@
|
||||||
|
// Copyright 2013 Prometheus Team
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -70,7 +83,7 @@ func (serv MetricsService) QueryRange(Expr string, End int64, Range int64, Step
|
||||||
}
|
}
|
||||||
|
|
||||||
func (serv MetricsService) Metrics() string {
|
func (serv MetricsService) Metrics() string {
|
||||||
metricNames, err := serv.persistence.GetAllMetricNames()
|
metricNames, err := serv.appState.Persistence.GetAllMetricNames()
|
||||||
rb := serv.ResponseBuilder()
|
rb := serv.ResponseBuilder()
|
||||||
rb.SetContentType(gorest.Application_Json)
|
rb.SetContentType(gorest.Application_Json)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
48
web/api/targets.go
Normal file
48
web/api/targets.go
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
// Copyright 2013 Prometheus Team
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/prometheus/model"
|
||||||
|
"github.com/prometheus/prometheus/retrieval"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TargetGroup struct {
|
||||||
|
Endpoints []string `json:"endpoints"`
|
||||||
|
BaseLabels map[string]string `json:"baseLabels"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (serv MetricsService) SetTargets(targetGroups []TargetGroup, jobName string) {
|
||||||
|
if job := serv.appState.Config.GetJobByName(jobName); job == nil {
|
||||||
|
rb := serv.ResponseBuilder()
|
||||||
|
rb.SetResponseCode(http.StatusNotFound)
|
||||||
|
} else {
|
||||||
|
newTargets := []retrieval.Target{}
|
||||||
|
for _, targetGroup := range targetGroups {
|
||||||
|
// Do mandatory map type conversion due to Go shortcomings.
|
||||||
|
baseLabels := model.LabelSet{}
|
||||||
|
for label, value := range targetGroup.BaseLabels {
|
||||||
|
baseLabels[model.LabelName(label)] = model.LabelValue(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, endpoint := range targetGroup.Endpoints {
|
||||||
|
newTarget := retrieval.NewTarget(endpoint, time.Second*5, baseLabels)
|
||||||
|
newTargets = append(newTargets, newTarget)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
serv.appState.TargetManager.ReplaceTargets(job, newTargets, serv.appState.Config.Global.ScrapeInterval)
|
||||||
|
}
|
||||||
|
}
|
|
@ -15,6 +15,7 @@ package web
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/prometheus/prometheus/appstate"
|
"github.com/prometheus/prometheus/appstate"
|
||||||
|
"github.com/prometheus/prometheus/retrieval"
|
||||||
"html/template"
|
"html/template"
|
||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
@ -23,7 +24,7 @@ type PrometheusStatus struct {
|
||||||
Config string
|
Config string
|
||||||
Rules string
|
Rules string
|
||||||
Status string
|
Status string
|
||||||
Targets string
|
TargetPools map[string]*retrieval.TargetPool
|
||||||
}
|
}
|
||||||
|
|
||||||
type StatusHandler struct {
|
type StatusHandler struct {
|
||||||
|
@ -35,7 +36,7 @@ func (h *StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
Config: h.appState.Config.ToString(0),
|
Config: h.appState.Config.ToString(0),
|
||||||
Rules: "TODO: list rules here",
|
Rules: "TODO: list rules here",
|
||||||
Status: "TODO: add status information here",
|
Status: "TODO: add status information here",
|
||||||
Targets: "TODO: list targets here",
|
TargetPools: h.appState.TargetManager.Pools(),
|
||||||
}
|
}
|
||||||
t, _ := template.ParseFiles("web/templates/status.html")
|
t, _ := template.ParseFiles("web/templates/status.html")
|
||||||
t.Execute(w, status)
|
t.Execute(w, status)
|
||||||
|
|
|
@ -27,7 +27,19 @@
|
||||||
|
|
||||||
<h2>Targets</h2>
|
<h2>Targets</h2>
|
||||||
<div class="grouping_box">
|
<div class="grouping_box">
|
||||||
{{.Targets}}
|
<ul>
|
||||||
|
{{range $job, $pool := .TargetPools}}
|
||||||
|
<li>{{$job}}
|
||||||
|
<ul>
|
||||||
|
{{range $pool.Targets}}
|
||||||
|
<li>
|
||||||
|
<a href="{{.Address}}">{{.Address}}</a> (State: {{.State}}, Base labels: {{.BaseLabels}})
|
||||||
|
</li>
|
||||||
|
{{end}}
|
||||||
|
</ul>
|
||||||
|
</li>
|
||||||
|
{{end}}
|
||||||
|
</ul>
|
||||||
</div>
|
</div>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
|
|
|
@ -29,7 +29,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func StartServing(appState *appstate.ApplicationState) {
|
func StartServing(appState *appstate.ApplicationState) {
|
||||||
gorest.RegisterService(api.NewMetricsService(appState.Persistence))
|
gorest.RegisterService(api.NewMetricsService(appState))
|
||||||
exporter := registry.DefaultRegistry.YieldExporter()
|
exporter := registry.DefaultRegistry.YieldExporter()
|
||||||
|
|
||||||
http.Handle("/status", &StatusHandler{appState: appState})
|
http.Handle("/status", &StatusHandler{appState: appState})
|
||||||
|
|
Loading…
Reference in a new issue