prometheus/retrieval/targetpool.go
Bjoern Rabenstein 24ece38f7c Make metrics exported by the Prometheus server itself more consistent.
- Always spell out the time unit (e.g. milliseconds instead of ms).

- Remove "_total" from the names of metrics that are not counters.

- Make use of the "Namespace" and "Subsystem" fields in the options.

- Removed the "capacity" facet from all metrics about channels/queues.
  These are all fixed via command line flags and will never change
  during the runtime of a process. Also, they should not be part of
  the same metric family. I have added separate metrics for the
  capacity of queues as convenience. (They will never change and are
  only set once.)

- I left "metric_disk_latency_microseconds" unchanged, although that
  metric measures the latency of the storage device, even if it is not
  a spinning disk. "SSD" is read by many as "solid state disk", so
  it's not too far off. (It should be "solid state drive", of course,
  but "metric_drive_latency_microseconds" is probably confusing.)

- Brian suggested to not mix "failure" and "success" outcome in the
  same metric family (distinguished by labels). For now, I left it as
  it is. We are touching some bigger issue here, especially as other
  parts in the Prometheus ecosystem are following the same
  principle. We still need to come to terms here and then change
  things consistently everywhere.

Change-Id: If799458b450d18f78500f05990301c12525197d3
2014-07-31 15:44:31 +02:00

183 lines
4.2 KiB
Go

// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"sync"
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/client_golang/prometheus"
)
const (
targetAddQueueSize = 100
targetReplaceQueueSize = 1
intervalKey = "interval"
)
var (
retrievalDurations = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "targetpool_retrieve_time_milliseconds",
Help: "The time needed for each TargetPool to retrieve state from all included entities.",
Objectives: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
},
[]string{intervalKey},
)
)
func init() {
prometheus.MustRegister(retrievalDurations)
}
type TargetPool struct {
sync.RWMutex
done chan bool
manager TargetManager
targets targets
addTargetQueue chan Target
replaceTargetsQueue chan targets
targetProvider TargetProvider
}
func NewTargetPool(m TargetManager, p TargetProvider) *TargetPool {
return &TargetPool{
manager: m,
addTargetQueue: make(chan Target, targetAddQueueSize),
replaceTargetsQueue: make(chan targets, targetReplaceQueueSize),
targetProvider: p,
done: make(chan bool),
}
}
func (p *TargetPool) Run(ingester extraction.Ingester, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.runIteration(ingester, interval)
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case newTargets := <-p.replaceTargetsQueue:
p.replaceTargets(newTargets)
case <-p.done:
glog.Info("TargetPool exiting...")
return
}
}
}
func (p TargetPool) Stop() {
p.done <- true
}
func (p *TargetPool) AddTarget(target Target) {
p.addTargetQueue <- target
}
func (p *TargetPool) addTarget(target Target) {
p.Lock()
defer p.Unlock()
p.targets = append(p.targets, target)
}
func (p *TargetPool) ReplaceTargets(newTargets []Target) {
p.Lock()
defer p.Unlock()
// If there is anything remaining in the queue for effectuation, clear it out,
// because the last mutation should win.
select {
case <-p.replaceTargetsQueue:
default:
p.replaceTargetsQueue <- newTargets
}
}
func (p *TargetPool) replaceTargets(newTargets []Target) {
p.Lock()
defer p.Unlock()
// Replace old target list by new one, but reuse those targets from the old
// list of targets which are also in the new list (to preserve scheduling and
// health state).
for j, newTarget := range newTargets {
for _, oldTarget := range p.targets {
if oldTarget.Address() == newTarget.Address() {
oldTarget.Merge(newTargets[j])
newTargets[j] = oldTarget
}
}
}
p.targets = newTargets
}
func (p *TargetPool) runSingle(ingester extraction.Ingester, t Target) {
p.manager.acquire()
defer p.manager.release()
t.Scrape(ingester)
}
func (p *TargetPool) runIteration(ingester extraction.Ingester, interval time.Duration) {
if p.targetProvider != nil {
targets, err := p.targetProvider.Targets()
if err != nil {
glog.Warningf("Error looking up targets, keeping old list: %s", err)
} else {
p.ReplaceTargets(targets)
}
}
p.RLock()
defer p.RUnlock()
begin := time.Now()
wait := sync.WaitGroup{}
for _, target := range p.targets {
wait.Add(1)
go func(t Target) {
p.runSingle(ingester, t)
wait.Done()
}(target)
}
wait.Wait()
duration := float64(time.Since(begin) / time.Millisecond)
retrievalDurations.WithLabelValues(interval.String()).Observe(duration)
}
func (p *TargetPool) Targets() []Target {
p.RLock()
defer p.RUnlock()
targets := make([]Target, len(p.targets))
copy(targets, p.targets)
return targets
}