prometheus/scrape/target.go
György Krajcsovits 79020b1e85 Comment float histogram as well
Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
2024-06-18 15:22:03 +02:00

586 lines
16 KiB
Go

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scrape
import (
"errors"
"fmt"
"hash/fnv"
"net"
"net/url"
"strings"
"sync"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
)
// TargetHealth describes the health state of a target.
type TargetHealth string
// The possible health states of a target based on the last performed scrape.
const (
HealthUnknown TargetHealth = "unknown"
HealthGood TargetHealth = "up"
HealthBad TargetHealth = "down"
)
// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
// Labels before any processing.
discoveredLabels labels.Labels
// Any labels that are added to this target and its metrics.
labels labels.Labels
// Additional URL parameters that are part of the target URL.
params url.Values
mtx sync.RWMutex
lastError error
lastScrape time.Time
lastScrapeDuration time.Duration
health TargetHealth
metadata MetricMetadataStore
}
// NewTarget creates a reasonably configured target for querying.
func NewTarget(labels, discoveredLabels labels.Labels, params url.Values) *Target {
return &Target{
labels: labels,
discoveredLabels: discoveredLabels,
params: params,
health: HealthUnknown,
}
}
func (t *Target) String() string {
return t.URL().String()
}
// MetricMetadataStore represents a storage for metadata.
type MetricMetadataStore interface {
ListMetadata() []MetricMetadata
GetMetadata(metric string) (MetricMetadata, bool)
SizeMetadata() int
LengthMetadata() int
}
// MetricMetadata is a piece of metadata for a metric.
type MetricMetadata struct {
Metric string
Type model.MetricType
Help string
Unit string
}
func (t *Target) ListMetadata() []MetricMetadata {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.metadata == nil {
return nil
}
return t.metadata.ListMetadata()
}
func (t *Target) SizeMetadata() int {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.metadata == nil {
return 0
}
return t.metadata.SizeMetadata()
}
func (t *Target) LengthMetadata() int {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.metadata == nil {
return 0
}
return t.metadata.LengthMetadata()
}
// GetMetadata returns type and help metadata for the given metric.
func (t *Target) GetMetadata(metric string) (MetricMetadata, bool) {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.metadata == nil {
return MetricMetadata{}, false
}
return t.metadata.GetMetadata(metric)
}
func (t *Target) SetMetadataStore(s MetricMetadataStore) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.metadata = s
}
// hash returns an identifying hash for the target.
func (t *Target) hash() uint64 {
h := fnv.New64a()
h.Write([]byte(fmt.Sprintf("%016d", t.labels.Hash())))
h.Write([]byte(t.URL().String()))
return h.Sum64()
}
// offset returns the time until the next scrape cycle for the target.
// It includes the global server offsetSeed for scrapes from multiple Prometheus to try to be at different times.
func (t *Target) offset(interval time.Duration, offsetSeed uint64) time.Duration {
now := time.Now().UnixNano()
// Base is a pinned to absolute time, no matter how often offset is called.
var (
base = int64(interval) - now%int64(interval)
offset = (t.hash() ^ offsetSeed) % uint64(interval)
next = base + int64(offset)
)
if next > int64(interval) {
next -= int64(interval)
}
return time.Duration(next)
}
// Labels returns a copy of the set of all public labels of the target.
func (t *Target) Labels(b *labels.ScratchBuilder) labels.Labels {
b.Reset()
t.labels.Range(func(l labels.Label) {
if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
b.Add(l.Name, l.Value)
}
})
return b.Labels()
}
// LabelsRange calls f on each public label of the target.
func (t *Target) LabelsRange(f func(l labels.Label)) {
t.labels.Range(func(l labels.Label) {
if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
f(l)
}
})
}
// DiscoveredLabels returns a copy of the target's labels before any processing.
func (t *Target) DiscoveredLabels() labels.Labels {
t.mtx.Lock()
defer t.mtx.Unlock()
return t.discoveredLabels.Copy()
}
// SetDiscoveredLabels sets new DiscoveredLabels.
func (t *Target) SetDiscoveredLabels(l labels.Labels) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.discoveredLabels = l
}
// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
params := url.Values{}
for k, v := range t.params {
params[k] = make([]string, len(v))
copy(params[k], v)
}
t.labels.Range(func(l labels.Label) {
if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) {
return
}
ks := l.Name[len(model.ParamLabelPrefix):]
if len(params[ks]) > 0 {
params[ks][0] = l.Value
} else {
params[ks] = []string{l.Value}
}
})
return &url.URL{
Scheme: t.labels.Get(model.SchemeLabel),
Host: t.labels.Get(model.AddressLabel),
Path: t.labels.Get(model.MetricsPathLabel),
RawQuery: params.Encode(),
}
}
// Report sets target data about the last scrape.
func (t *Target) Report(start time.Time, dur time.Duration, err error) {
t.mtx.Lock()
defer t.mtx.Unlock()
if err == nil {
t.health = HealthGood
} else {
t.health = HealthBad
}
t.lastError = err
t.lastScrape = start
t.lastScrapeDuration = dur
}
// LastError returns the error encountered during the last scrape.
func (t *Target) LastError() error {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.lastError
}
// LastScrape returns the time of the last scrape.
func (t *Target) LastScrape() time.Time {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.lastScrape
}
// LastScrapeDuration returns how long the last scrape of the target took.
func (t *Target) LastScrapeDuration() time.Duration {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.lastScrapeDuration
}
// Health returns the last known health state of the target.
func (t *Target) Health() TargetHealth {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.health
}
// intervalAndTimeout returns the interval and timeout derived from
// the targets labels.
func (t *Target) intervalAndTimeout(defaultInterval, defaultDuration time.Duration) (time.Duration, time.Duration, error) {
t.mtx.RLock()
defer t.mtx.RUnlock()
intervalLabel := t.labels.Get(model.ScrapeIntervalLabel)
interval, err := model.ParseDuration(intervalLabel)
if err != nil {
return defaultInterval, defaultDuration, fmt.Errorf("Error parsing interval label %q: %w", intervalLabel, err)
}
timeoutLabel := t.labels.Get(model.ScrapeTimeoutLabel)
timeout, err := model.ParseDuration(timeoutLabel)
if err != nil {
return defaultInterval, defaultDuration, fmt.Errorf("Error parsing timeout label %q: %w", timeoutLabel, err)
}
return time.Duration(interval), time.Duration(timeout), nil
}
// GetValue gets a label value from the entire label set.
func (t *Target) GetValue(name string) string {
return t.labels.Get(name)
}
// Targets is a sortable list of targets.
type Targets []*Target
func (ts Targets) Len() int { return len(ts) }
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
var (
errSampleLimit = errors.New("sample limit exceeded")
errBucketLimit = errors.New("histogram bucket limit exceeded")
)
// limitAppender limits the number of total appended samples in a batch.
type limitAppender struct {
storage.Appender
limit int
i int
}
func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if !value.IsStaleNaN(v) {
app.i++
if app.i > app.limit {
return 0, errSampleLimit
}
}
ref, err := app.Appender.Append(ref, lset, t, v)
if err != nil {
return 0, err
}
return ref, nil
}
type timeLimitAppender struct {
storage.Appender
maxTime int64
}
func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if t > app.maxTime {
return 0, storage.ErrOutOfBounds
}
ref, err := app.Appender.Append(ref, lset, t, v)
if err != nil {
return 0, err
}
return ref, nil
}
// bucketLimitAppender limits the number of total appended samples in a batch.
type bucketLimitAppender struct {
storage.Appender
limit int
}
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(h.Schema) {
return 0, errBucketLimit
}
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
if h.Schema <= histogram.ExponentialSchemaMin {
return 0, errBucketLimit
}
h = h.ReduceResolution(h.Schema - 1)
}
}
if fh != nil {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(fh.Schema) {
return 0, errBucketLimit
}
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
if fh.Schema <= histogram.ExponentialSchemaMin {
return 0, errBucketLimit
}
fh = fh.ReduceResolution(fh.Schema - 1)
}
}
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
if err != nil {
return 0, err
}
return ref, nil
}
type maxSchemaAppender struct {
storage.Appender
maxSchema int32
}
func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
if histogram.IsExponentialSchema(h.Schema) && h.Schema > app.maxSchema {
h = h.ReduceResolution(app.maxSchema)
}
}
if fh != nil {
if histogram.IsExponentialSchema(fh.Schema) && fh.Schema > app.maxSchema {
fh = fh.ReduceResolution(app.maxSchema)
}
}
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
if err != nil {
return 0, err
}
return ref, nil
}
// PopulateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.
func PopulateLabels(lb *labels.Builder, cfg *config.ScrapeConfig, noDefaultPort bool) (res, orig labels.Labels, err error) {
// Copy labels into the labelset for the target if they are not set already.
scrapeLabels := []labels.Label{
{Name: model.JobLabel, Value: cfg.JobName},
{Name: model.ScrapeIntervalLabel, Value: cfg.ScrapeInterval.String()},
{Name: model.ScrapeTimeoutLabel, Value: cfg.ScrapeTimeout.String()},
{Name: model.MetricsPathLabel, Value: cfg.MetricsPath},
{Name: model.SchemeLabel, Value: cfg.Scheme},
}
for _, l := range scrapeLabels {
if lb.Get(l.Name) == "" {
lb.Set(l.Name, l.Value)
}
}
// Encode scrape query parameters as labels.
for k, v := range cfg.Params {
if len(v) > 0 {
lb.Set(model.ParamLabelPrefix+k, v[0])
}
}
preRelabelLabels := lb.Labels()
keep := relabel.ProcessBuilder(lb, cfg.RelabelConfigs...)
// Check if the target was dropped.
if !keep {
return labels.EmptyLabels(), preRelabelLabels, nil
}
if v := lb.Get(model.AddressLabel); v == "" {
return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("no address")
}
// addPort checks whether we should add a default port to the address.
// If the address is not valid, we don't append a port either.
addPort := func(s string) (string, string, bool) {
// If we can split, a port exists and we don't have to add one.
if host, port, err := net.SplitHostPort(s); err == nil {
return host, port, false
}
// If adding a port makes it valid, the previous error
// was not due to an invalid address and we can append a port.
_, _, err := net.SplitHostPort(s + ":1234")
return "", "", err == nil
}
addr := lb.Get(model.AddressLabel)
scheme := lb.Get(model.SchemeLabel)
host, port, add := addPort(addr)
// If it's an address with no trailing port, infer it based on the used scheme
// unless the no-default-scrape-port feature flag is present.
if !noDefaultPort && add {
// Addresses reaching this point are already wrapped in [] if necessary.
switch scheme {
case "http", "":
addr += ":80"
case "https":
addr += ":443"
default:
return labels.EmptyLabels(), labels.EmptyLabels(), fmt.Errorf("invalid scheme: %q", cfg.Scheme)
}
lb.Set(model.AddressLabel, addr)
}
if noDefaultPort {
// If it's an address with a trailing default port and the
// no-default-scrape-port flag is present, remove the port.
switch port {
case "80":
if scheme == "http" {
lb.Set(model.AddressLabel, host)
}
case "443":
if scheme == "https" {
lb.Set(model.AddressLabel, host)
}
}
}
if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), err
}
interval := lb.Get(model.ScrapeIntervalLabel)
intervalDuration, err := model.ParseDuration(interval)
if err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), fmt.Errorf("error parsing scrape interval: %w", err)
}
if time.Duration(intervalDuration) == 0 {
return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("scrape interval cannot be 0")
}
timeout := lb.Get(model.ScrapeTimeoutLabel)
timeoutDuration, err := model.ParseDuration(timeout)
if err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), fmt.Errorf("error parsing scrape timeout: %w", err)
}
if time.Duration(timeoutDuration) == 0 {
return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("scrape timeout cannot be 0")
}
if timeoutDuration > intervalDuration {
return labels.EmptyLabels(), labels.EmptyLabels(), fmt.Errorf("scrape timeout cannot be greater than scrape interval (%q > %q)", timeout, interval)
}
// Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set.
lb.Range(func(l labels.Label) {
if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
lb.Del(l.Name)
}
})
// Default the instance label to the target address.
if v := lb.Get(model.InstanceLabel); v == "" {
lb.Set(model.InstanceLabel, addr)
}
res = lb.Labels()
err = res.Validate(func(l labels.Label) error {
// Check label values are valid, drop the target if not.
if !model.LabelValue(l.Value).IsValid() {
return fmt.Errorf("invalid label value for %q: %q", l.Name, l.Value)
}
return nil
})
if err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), err
}
return res, preRelabelLabels, nil
}
// TargetsFromGroup builds targets based on the given TargetGroup and config.
func TargetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig, noDefaultPort bool, targets []*Target, lb *labels.Builder) ([]*Target, []error) {
targets = targets[:0]
failures := []error{}
for i, tlset := range tg.Targets {
lb.Reset(labels.EmptyLabels())
for ln, lv := range tlset {
lb.Set(string(ln), string(lv))
}
for ln, lv := range tg.Labels {
if _, ok := tlset[ln]; !ok {
lb.Set(string(ln), string(lv))
}
}
lset, origLabels, err := PopulateLabels(lb, cfg, noDefaultPort)
if err != nil {
failures = append(failures, fmt.Errorf("instance %d in group %s: %w", i, tg, err))
}
if !lset.IsEmpty() || !origLabels.IsEmpty() {
targets = append(targets, NewTarget(lset, origLabels, cfg.Params))
}
}
return targets, failures
}