mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
ad40d0abbc
Add ability to limit scrape samples, and related metrics
582 lines
16 KiB
Go
582 lines
16 KiB
Go
// Copyright 2016 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package retrieval
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/expfmt"
|
|
"github.com/prometheus/common/log"
|
|
"github.com/prometheus/common/model"
|
|
"golang.org/x/net/context"
|
|
"golang.org/x/net/context/ctxhttp"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/storage/local"
|
|
)
|
|
|
|
const (
|
|
scrapeHealthMetricName = "up"
|
|
scrapeDurationMetricName = "scrape_duration_seconds"
|
|
scrapeSamplesMetricName = "scrape_samples_scraped"
|
|
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling"
|
|
)
|
|
|
|
var (
|
|
targetIntervalLength = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Name: "prometheus_target_interval_length_seconds",
|
|
Help: "Actual intervals between scrapes.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
},
|
|
[]string{"interval"},
|
|
)
|
|
targetSkippedScrapes = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_skipped_scrapes_total",
|
|
Help: "Total number of scrapes that were skipped because the metric storage was throttled.",
|
|
},
|
|
)
|
|
targetReloadIntervalLength = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Name: "prometheus_target_reload_length_seconds",
|
|
Help: "Actual interval to reload the scrape pool with a given configuration.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
},
|
|
[]string{"interval"},
|
|
)
|
|
targetSyncIntervalLength = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Name: "prometheus_target_sync_length_seconds",
|
|
Help: "Actual interval to sync the scrape pool.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
},
|
|
[]string{"scrape_job"},
|
|
)
|
|
targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrape_pool_sync_total",
|
|
Help: "Total number of syncs that were executed on a scrape pool.",
|
|
},
|
|
[]string{"scrape_job"},
|
|
)
|
|
targetScrapeSampleLimit = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrapes_exceeded_sample_limit_total",
|
|
Help: "Total number of scrapes that hit the sample limit and were rejected.",
|
|
},
|
|
)
|
|
)
|
|
|
|
func init() {
|
|
prometheus.MustRegister(targetIntervalLength)
|
|
prometheus.MustRegister(targetSkippedScrapes)
|
|
prometheus.MustRegister(targetReloadIntervalLength)
|
|
prometheus.MustRegister(targetSyncIntervalLength)
|
|
prometheus.MustRegister(targetScrapePoolSyncsCounter)
|
|
prometheus.MustRegister(targetScrapeSampleLimit)
|
|
}
|
|
|
|
// scrapePool manages scrapes for sets of targets.
|
|
type scrapePool struct {
|
|
appender storage.SampleAppender
|
|
|
|
ctx context.Context
|
|
|
|
mtx sync.RWMutex
|
|
config *config.ScrapeConfig
|
|
client *http.Client
|
|
// Targets and loops must always be synchronized to have the same
|
|
// set of hashes.
|
|
targets map[uint64]*Target
|
|
loops map[uint64]loop
|
|
|
|
// Constructor for new scrape loops. This is settable for testing convenience.
|
|
newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop
|
|
}
|
|
|
|
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
|
|
client, err := NewHTTPClient(cfg.HTTPClientConfig)
|
|
if err != nil {
|
|
// Any errors that could occur here should be caught during config validation.
|
|
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
|
|
}
|
|
return &scrapePool{
|
|
appender: app,
|
|
config: cfg,
|
|
ctx: ctx,
|
|
client: client,
|
|
targets: map[uint64]*Target{},
|
|
loops: map[uint64]loop{},
|
|
newLoop: newScrapeLoop,
|
|
}
|
|
}
|
|
|
|
// stop terminates all scrape loops and returns after they all terminated.
|
|
func (sp *scrapePool) stop() {
|
|
var wg sync.WaitGroup
|
|
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
|
|
for fp, l := range sp.loops {
|
|
wg.Add(1)
|
|
|
|
go func(l loop) {
|
|
l.stop()
|
|
wg.Done()
|
|
}(l)
|
|
|
|
delete(sp.loops, fp)
|
|
delete(sp.targets, fp)
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// reload the scrape pool with the given scrape configuration. The target state is preserved
|
|
// but all scrape loops are restarted with the new scrape configuration.
|
|
// This method returns after all scrape loops that were stopped have fully terminated.
|
|
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|
start := time.Now()
|
|
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
|
|
client, err := NewHTTPClient(cfg.HTTPClientConfig)
|
|
if err != nil {
|
|
// Any errors that could occur here should be caught during config validation.
|
|
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
|
|
}
|
|
sp.config = cfg
|
|
sp.client = client
|
|
|
|
var (
|
|
wg sync.WaitGroup
|
|
interval = time.Duration(sp.config.ScrapeInterval)
|
|
timeout = time.Duration(sp.config.ScrapeTimeout)
|
|
)
|
|
|
|
for fp, oldLoop := range sp.loops {
|
|
var (
|
|
t = sp.targets[fp]
|
|
s = &targetScraper{Target: t, client: sp.client}
|
|
newLoop = sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)
|
|
)
|
|
wg.Add(1)
|
|
|
|
go func(oldLoop, newLoop loop) {
|
|
oldLoop.stop()
|
|
wg.Done()
|
|
|
|
go newLoop.run(interval, timeout, nil)
|
|
}(oldLoop, newLoop)
|
|
|
|
sp.loops[fp] = newLoop
|
|
}
|
|
|
|
wg.Wait()
|
|
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
|
|
time.Since(start).Seconds(),
|
|
)
|
|
}
|
|
|
|
// Sync converts target groups into actual scrape targets and synchronizes
|
|
// the currently running scraper with the resulting set.
|
|
func (sp *scrapePool) Sync(tgs []*config.TargetGroup) {
|
|
start := time.Now()
|
|
|
|
var all []*Target
|
|
for _, tg := range tgs {
|
|
targets, err := targetsFromGroup(tg, sp.config)
|
|
if err != nil {
|
|
log.With("err", err).Error("creating targets failed")
|
|
continue
|
|
}
|
|
all = append(all, targets...)
|
|
}
|
|
sp.sync(all)
|
|
|
|
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
|
time.Since(start).Seconds(),
|
|
)
|
|
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
|
|
}
|
|
|
|
// sync takes a list of potentially duplicated targets, deduplicates them, starts
|
|
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
|
// It returns after all stopped scrape loops terminated.
|
|
func (sp *scrapePool) sync(targets []*Target) {
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
|
|
var (
|
|
uniqueTargets = map[uint64]struct{}{}
|
|
interval = time.Duration(sp.config.ScrapeInterval)
|
|
timeout = time.Duration(sp.config.ScrapeTimeout)
|
|
)
|
|
|
|
for _, t := range targets {
|
|
hash := t.hash()
|
|
uniqueTargets[hash] = struct{}{}
|
|
|
|
if _, ok := sp.targets[hash]; !ok {
|
|
s := &targetScraper{Target: t, client: sp.client}
|
|
l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)
|
|
|
|
sp.targets[hash] = t
|
|
sp.loops[hash] = l
|
|
|
|
go l.run(interval, timeout, nil)
|
|
}
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
// Stop and remove old targets and scraper loops.
|
|
for hash := range sp.targets {
|
|
if _, ok := uniqueTargets[hash]; !ok {
|
|
wg.Add(1)
|
|
go func(l loop) {
|
|
l.stop()
|
|
wg.Done()
|
|
}(sp.loops[hash])
|
|
|
|
delete(sp.loops, hash)
|
|
delete(sp.targets, hash)
|
|
}
|
|
}
|
|
|
|
// Wait for all potentially stopped scrapers to terminate.
|
|
// This covers the case of flapping targets. If the server is under high load, a new scraper
|
|
// may be active and tries to insert. The old scraper that didn't terminate yet could still
|
|
// be inserting a previous sample set.
|
|
wg.Wait()
|
|
}
|
|
|
|
// A scraper retrieves samples and accepts a status report at the end.
|
|
type scraper interface {
|
|
scrape(ctx context.Context, ts time.Time) (model.Samples, error)
|
|
report(start time.Time, dur time.Duration, err error)
|
|
offset(interval time.Duration) time.Duration
|
|
}
|
|
|
|
// targetScraper implements the scraper interface for a target.
|
|
type targetScraper struct {
|
|
*Target
|
|
client *http.Client
|
|
}
|
|
|
|
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
|
|
|
|
func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) {
|
|
req, err := http.NewRequest("GET", s.URL().String(), nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Add("Accept", acceptHeader)
|
|
|
|
resp, err := ctxhttp.Do(ctx, s.client, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)
|
|
}
|
|
|
|
var (
|
|
allSamples = make(model.Samples, 0, 200)
|
|
decSamples = make(model.Vector, 0, 50)
|
|
)
|
|
sdec := expfmt.SampleDecoder{
|
|
Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)),
|
|
Opts: &expfmt.DecodeOptions{
|
|
Timestamp: model.TimeFromUnixNano(ts.UnixNano()),
|
|
},
|
|
}
|
|
|
|
for {
|
|
if err = sdec.Decode(&decSamples); err != nil {
|
|
break
|
|
}
|
|
allSamples = append(allSamples, decSamples...)
|
|
decSamples = decSamples[:0]
|
|
}
|
|
|
|
if err == io.EOF {
|
|
// Set err to nil since it is used in the scrape health recording.
|
|
err = nil
|
|
}
|
|
return allSamples, err
|
|
}
|
|
|
|
// A loop can run and be stopped again. It must not be reused after it was stopped.
|
|
type loop interface {
|
|
run(interval, timeout time.Duration, errc chan<- error)
|
|
stop()
|
|
}
|
|
|
|
type scrapeLoop struct {
|
|
scraper scraper
|
|
|
|
// Where samples are ultimately sent.
|
|
appender storage.SampleAppender
|
|
|
|
targetLabels model.LabelSet
|
|
metricRelabelConfigs []*config.RelabelConfig
|
|
honorLabels bool
|
|
sampleLimit uint
|
|
|
|
done chan struct{}
|
|
ctx context.Context
|
|
cancel func()
|
|
}
|
|
|
|
func newScrapeLoop(
|
|
ctx context.Context,
|
|
sc scraper,
|
|
appender storage.SampleAppender,
|
|
targetLabels model.LabelSet,
|
|
config *config.ScrapeConfig,
|
|
) loop {
|
|
sl := &scrapeLoop{
|
|
scraper: sc,
|
|
appender: appender,
|
|
targetLabels: targetLabels,
|
|
metricRelabelConfigs: config.MetricRelabelConfigs,
|
|
honorLabels: config.HonorLabels,
|
|
sampleLimit: config.SampleLimit,
|
|
done: make(chan struct{}),
|
|
}
|
|
sl.ctx, sl.cancel = context.WithCancel(ctx)
|
|
|
|
return sl
|
|
}
|
|
|
|
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
|
defer close(sl.done)
|
|
|
|
select {
|
|
case <-time.After(sl.scraper.offset(interval)):
|
|
// Continue after a scraping offset.
|
|
case <-sl.ctx.Done():
|
|
return
|
|
}
|
|
|
|
var last time.Time
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
if !sl.appender.NeedsThrottling() {
|
|
var (
|
|
start = time.Now()
|
|
scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout)
|
|
numPostRelabelSamples = 0
|
|
)
|
|
|
|
// Only record after the first scrape.
|
|
if !last.IsZero() {
|
|
targetIntervalLength.WithLabelValues(interval.String()).Observe(
|
|
time.Since(last).Seconds(),
|
|
)
|
|
}
|
|
|
|
samples, err := sl.scraper.scrape(scrapeCtx, start)
|
|
if err == nil {
|
|
numPostRelabelSamples, err = sl.append(samples)
|
|
}
|
|
if err != nil && errc != nil {
|
|
errc <- err
|
|
}
|
|
sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err)
|
|
last = start
|
|
} else {
|
|
targetSkippedScrapes.Inc()
|
|
}
|
|
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sl *scrapeLoop) stop() {
|
|
sl.cancel()
|
|
<-sl.done
|
|
}
|
|
|
|
// wrapAppender wraps a SampleAppender for relabeling. It returns the wrappend
|
|
// appender and an innermost countingAppender that counts the samples actually
|
|
// appended in the end.
|
|
func (sl *scrapeLoop) wrapAppender(app storage.SampleAppender) (storage.SampleAppender, *countingAppender) {
|
|
// Innermost appender is a countingAppender to count how many samples
|
|
// are left in the end.
|
|
countingAppender := &countingAppender{
|
|
SampleAppender: app,
|
|
}
|
|
app = countingAppender
|
|
|
|
// The relabelAppender has to be inside the label-modifying appenders so
|
|
// the relabeling rules are applied to the correct label set.
|
|
if len(sl.metricRelabelConfigs) > 0 {
|
|
app = relabelAppender{
|
|
SampleAppender: app,
|
|
relabelings: sl.metricRelabelConfigs,
|
|
}
|
|
}
|
|
|
|
if sl.honorLabels {
|
|
app = honorLabelsAppender{
|
|
SampleAppender: app,
|
|
labels: sl.targetLabels,
|
|
}
|
|
} else {
|
|
app = ruleLabelsAppender{
|
|
SampleAppender: app,
|
|
labels: sl.targetLabels,
|
|
}
|
|
}
|
|
return app, countingAppender
|
|
}
|
|
|
|
func (sl *scrapeLoop) append(samples model.Samples) (int, error) {
|
|
var (
|
|
numOutOfOrder = 0
|
|
numDuplicates = 0
|
|
app = sl.appender
|
|
countingApp *countingAppender
|
|
)
|
|
|
|
if sl.sampleLimit > 0 {
|
|
// We need to check for the sample limit, so append everything
|
|
// to a wrapped bufferAppender first. Then point samples to the
|
|
// result.
|
|
bufApp := &bufferAppender{buffer: make(model.Samples, 0, len(samples))}
|
|
var wrappedBufApp storage.SampleAppender
|
|
wrappedBufApp, countingApp = sl.wrapAppender(bufApp)
|
|
for _, s := range samples {
|
|
// Ignore errors as bufferedAppender always succeds.
|
|
wrappedBufApp.Append(s)
|
|
}
|
|
samples = bufApp.buffer
|
|
if uint(countingApp.count) > sl.sampleLimit {
|
|
targetScrapeSampleLimit.Inc()
|
|
return countingApp.count, fmt.Errorf(
|
|
"%d samples exceeded limit of %d", countingApp.count, sl.sampleLimit,
|
|
)
|
|
}
|
|
} else {
|
|
// No need to check for sample limit. Wrap sl.appender directly.
|
|
app, countingApp = sl.wrapAppender(sl.appender)
|
|
}
|
|
|
|
for _, s := range samples {
|
|
if err := app.Append(s); err != nil {
|
|
switch err {
|
|
case local.ErrOutOfOrderSample:
|
|
numOutOfOrder++
|
|
log.With("sample", s).With("error", err).Debug("Sample discarded")
|
|
case local.ErrDuplicateSampleForTimestamp:
|
|
numDuplicates++
|
|
log.With("sample", s).With("error", err).Debug("Sample discarded")
|
|
default:
|
|
log.With("sample", s).With("error", err).Warn("Sample discarded")
|
|
}
|
|
}
|
|
}
|
|
if numOutOfOrder > 0 {
|
|
log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
|
|
}
|
|
if numDuplicates > 0 {
|
|
log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
|
|
}
|
|
return countingApp.count, nil
|
|
}
|
|
|
|
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples, postRelabelSamples int, err error) {
|
|
sl.scraper.report(start, duration, err)
|
|
|
|
ts := model.TimeFromUnixNano(start.UnixNano())
|
|
|
|
var health model.SampleValue
|
|
if err == nil {
|
|
health = 1
|
|
}
|
|
|
|
healthSample := &model.Sample{
|
|
Metric: model.Metric{
|
|
model.MetricNameLabel: scrapeHealthMetricName,
|
|
},
|
|
Timestamp: ts,
|
|
Value: health,
|
|
}
|
|
durationSample := &model.Sample{
|
|
Metric: model.Metric{
|
|
model.MetricNameLabel: scrapeDurationMetricName,
|
|
},
|
|
Timestamp: ts,
|
|
Value: model.SampleValue(duration.Seconds()),
|
|
}
|
|
countSample := &model.Sample{
|
|
Metric: model.Metric{
|
|
model.MetricNameLabel: scrapeSamplesMetricName,
|
|
},
|
|
Timestamp: ts,
|
|
Value: model.SampleValue(scrapedSamples),
|
|
}
|
|
postRelabelSample := &model.Sample{
|
|
Metric: model.Metric{
|
|
model.MetricNameLabel: samplesPostRelabelMetricName,
|
|
},
|
|
Timestamp: ts,
|
|
Value: model.SampleValue(postRelabelSamples),
|
|
}
|
|
|
|
reportAppender := ruleLabelsAppender{
|
|
SampleAppender: sl.appender,
|
|
labels: sl.targetLabels,
|
|
}
|
|
|
|
if err := reportAppender.Append(healthSample); err != nil {
|
|
log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded")
|
|
}
|
|
if err := reportAppender.Append(durationSample); err != nil {
|
|
log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded")
|
|
}
|
|
if err := reportAppender.Append(countSample); err != nil {
|
|
log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded")
|
|
}
|
|
if err := reportAppender.Append(postRelabelSample); err != nil {
|
|
log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabeling sample discarded")
|
|
}
|
|
}
|