prometheus/scrape/target.go
xjewer 0d1a69353e scrape: Add global jitter for HA server (#5181)
* scrape: Add global jitter for HA server

Covers issue in https://github.com/prometheus/prometheus/pull/4926#issuecomment-449039848
where the HA setup become a problem for targets unable to be scraped simultaneously.
The new jitter per server relies on the hostname and external labels which necessarily to be uniq.

As before, scrape offset will be calculated with regard the absolute time, so even
restart/reload doesn't change scrape time per scrape target + prometheus instance.

Use fqdn if possible, otherwise fall back to the hostname. It adds extra random seed
to calculate server hash to be distinguish on machines with the same hostname, but
different DC.

Signed-off-by: Aleksei Semiglazov <xjewer@gmail.com>
2019-03-12 10:46:15 +00:00

435 lines
11 KiB
Go

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scrape
import (
"errors"
"fmt"
"hash/fnv"
"net"
"net/url"
"strings"
"sync"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/storage"
)
// TargetHealth describes the health state of a target.
type TargetHealth string
// The possible health states of a target based on the last performed scrape.
const (
HealthUnknown TargetHealth = "unknown"
HealthGood TargetHealth = "up"
HealthBad TargetHealth = "down"
)
// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
// Labels before any processing.
discoveredLabels labels.Labels
// Any labels that are added to this target and its metrics.
labels labels.Labels
// Additional URL parmeters that are part of the target URL.
params url.Values
mtx sync.RWMutex
lastError error
lastScrape time.Time
lastScrapeDuration time.Duration
health TargetHealth
metadata metricMetadataStore
}
// NewTarget creates a reasonably configured target for querying.
func NewTarget(labels, discoveredLabels labels.Labels, params url.Values) *Target {
return &Target{
labels: labels,
discoveredLabels: discoveredLabels,
params: params,
health: HealthUnknown,
}
}
func (t *Target) String() string {
return t.URL().String()
}
type metricMetadataStore interface {
listMetadata() []MetricMetadata
getMetadata(metric string) (MetricMetadata, bool)
}
// MetricMetadata is a piece of metadata for a metric.
type MetricMetadata struct {
Metric string
Type textparse.MetricType
Help string
Unit string
}
func (t *Target) MetadataList() []MetricMetadata {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.metadata == nil {
return nil
}
return t.metadata.listMetadata()
}
// Metadata returns type and help metadata for the given metric.
func (t *Target) Metadata(metric string) (MetricMetadata, bool) {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.metadata == nil {
return MetricMetadata{}, false
}
return t.metadata.getMetadata(metric)
}
func (t *Target) setMetadataStore(s metricMetadataStore) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.metadata = s
}
// hash returns an identifying hash for the target.
func (t *Target) hash() uint64 {
h := fnv.New64a()
h.Write([]byte(fmt.Sprintf("%016d", t.labels.Hash())))
h.Write([]byte(t.URL().String()))
return h.Sum64()
}
// offset returns the time until the next scrape cycle for the target.
// It includes the global server jitterSeed for scrapes from multiple Prometheus to try to be at different times.
func (t *Target) offset(interval time.Duration, jitterSeed uint64) time.Duration {
now := time.Now().UnixNano()
// Base is a pinned to absolute time, no matter how often offset is called.
var (
base = int64(interval) - now%int64(interval)
offset = (t.hash() ^ jitterSeed) % uint64(interval)
next = base + int64(offset)
)
if next > int64(interval) {
next -= int64(interval)
}
return time.Duration(next)
}
// Labels returns a copy of the set of all public labels of the target.
func (t *Target) Labels() labels.Labels {
lset := make(labels.Labels, 0, len(t.labels))
for _, l := range t.labels {
if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
lset = append(lset, l)
}
}
return lset
}
// DiscoveredLabels returns a copy of the target's labels before any processing.
func (t *Target) DiscoveredLabels() labels.Labels {
t.mtx.Lock()
defer t.mtx.Unlock()
lset := make(labels.Labels, len(t.discoveredLabels))
copy(lset, t.discoveredLabels)
return lset
}
// SetDiscoveredLabels sets new DiscoveredLabels
func (t *Target) SetDiscoveredLabels(l labels.Labels) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.discoveredLabels = l
}
// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
params := url.Values{}
for k, v := range t.params {
params[k] = make([]string, len(v))
copy(params[k], v)
}
for _, l := range t.labels {
if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) {
continue
}
ks := l.Name[len(model.ParamLabelPrefix):]
if len(params[ks]) > 0 {
params[ks][0] = l.Value
} else {
params[ks] = []string{l.Value}
}
}
return &url.URL{
Scheme: t.labels.Get(model.SchemeLabel),
Host: t.labels.Get(model.AddressLabel),
Path: t.labels.Get(model.MetricsPathLabel),
RawQuery: params.Encode(),
}
}
func (t *Target) report(start time.Time, dur time.Duration, err error) {
t.mtx.Lock()
defer t.mtx.Unlock()
if err == nil {
t.health = HealthGood
} else {
t.health = HealthBad
}
t.lastError = err
t.lastScrape = start
t.lastScrapeDuration = dur
}
// LastError returns the error encountered during the last scrape.
func (t *Target) LastError() error {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.lastError
}
// LastScrape returns the time of the last scrape.
func (t *Target) LastScrape() time.Time {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.lastScrape
}
// LastScrapeDuration returns how long the last scrape of the target took.
func (t *Target) LastScrapeDuration() time.Duration {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.lastScrapeDuration
}
// Health returns the last known health state of the target.
func (t *Target) Health() TargetHealth {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.health
}
// Targets is a sortable list of targets.
type Targets []*Target
func (ts Targets) Len() int { return len(ts) }
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
var errSampleLimit = errors.New("sample limit exceeded")
// limitAppender limits the number of total appended samples in a batch.
type limitAppender struct {
storage.Appender
limit int
i int
}
func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if !value.IsStaleNaN(v) {
app.i++
if app.i > app.limit {
return 0, errSampleLimit
}
}
ref, err := app.Appender.Add(lset, t, v)
if err != nil {
return 0, err
}
return ref, nil
}
func (app *limitAppender) AddFast(lset labels.Labels, ref uint64, t int64, v float64) error {
if !value.IsStaleNaN(v) {
app.i++
if app.i > app.limit {
return errSampleLimit
}
}
err := app.Appender.AddFast(lset, ref, t, v)
return err
}
type timeLimitAppender struct {
storage.Appender
maxTime int64
}
func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if t > app.maxTime {
return 0, storage.ErrOutOfBounds
}
ref, err := app.Appender.Add(lset, t, v)
if err != nil {
return 0, err
}
return ref, nil
}
func (app *timeLimitAppender) AddFast(lset labels.Labels, ref uint64, t int64, v float64) error {
if t > app.maxTime {
return storage.ErrOutOfBounds
}
err := app.Appender.AddFast(lset, ref, t, v)
return err
}
// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.
func populateLabels(lset labels.Labels, cfg *config.ScrapeConfig) (res, orig labels.Labels, err error) {
// Copy labels into the labelset for the target if they are not set already.
scrapeLabels := []labels.Label{
{Name: model.JobLabel, Value: cfg.JobName},
{Name: model.MetricsPathLabel, Value: cfg.MetricsPath},
{Name: model.SchemeLabel, Value: cfg.Scheme},
}
lb := labels.NewBuilder(lset)
for _, l := range scrapeLabels {
if lv := lset.Get(l.Name); lv == "" {
lb.Set(l.Name, l.Value)
}
}
// Encode scrape query parameters as labels.
for k, v := range cfg.Params {
if len(v) > 0 {
lb.Set(model.ParamLabelPrefix+k, v[0])
}
}
preRelabelLabels := lb.Labels()
lset = relabel.Process(preRelabelLabels, cfg.RelabelConfigs...)
// Check if the target was dropped.
if lset == nil {
return nil, preRelabelLabels, nil
}
if v := lset.Get(model.AddressLabel); v == "" {
return nil, nil, fmt.Errorf("no address")
}
lb = labels.NewBuilder(lset)
// addPort checks whether we should add a default port to the address.
// If the address is not valid, we don't append a port either.
addPort := func(s string) bool {
// If we can split, a port exists and we don't have to add one.
if _, _, err := net.SplitHostPort(s); err == nil {
return false
}
// If adding a port makes it valid, the previous error
// was not due to an invalid address and we can append a port.
_, _, err := net.SplitHostPort(s + ":1234")
return err == nil
}
addr := lset.Get(model.AddressLabel)
// If it's an address with no trailing port, infer it based on the used scheme.
if addPort(addr) {
// Addresses reaching this point are already wrapped in [] if necessary.
switch lset.Get(model.SchemeLabel) {
case "http", "":
addr = addr + ":80"
case "https":
addr = addr + ":443"
default:
return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
}
lb.Set(model.AddressLabel, addr)
}
if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
return nil, nil, err
}
// Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set.
for _, l := range lset {
if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
lb.Del(l.Name)
}
}
// Default the instance label to the target address.
if v := lset.Get(model.InstanceLabel); v == "" {
lb.Set(model.InstanceLabel, addr)
}
res = lb.Labels()
for _, l := range res {
// Check label values are valid, drop the target if not.
if !model.LabelValue(l.Value).IsValid() {
return nil, nil, fmt.Errorf("invalid label value for %q: %q", l.Name, l.Value)
}
}
return res, preRelabelLabels, nil
}
// targetsFromGroup builds targets based on the given TargetGroup and config.
func targetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig) ([]*Target, error) {
targets := make([]*Target, 0, len(tg.Targets))
for i, tlset := range tg.Targets {
lbls := make([]labels.Label, 0, len(tlset)+len(tg.Labels))
for ln, lv := range tlset {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}
for ln, lv := range tg.Labels {
if _, ok := tlset[ln]; !ok {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}
}
lset := labels.New(lbls...)
lbls, origLabels, err := populateLabels(lset, cfg)
if err != nil {
return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err)
}
if lbls != nil || origLabels != nil {
targets = append(targets, NewTarget(lbls, origLabels, cfg.Params))
}
}
return targets, nil
}