mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
Merge branch 'master' into appender-context
Signed-off-by: Annanay <annanayagarwal@gmail.com>
This commit is contained in:
commit
ec562f152b
|
@ -379,8 +379,11 @@ type ScrapeConfig struct {
|
|||
MetricsPath string `yaml:"metrics_path,omitempty"`
|
||||
// The URL scheme with which to fetch metrics from targets.
|
||||
Scheme string `yaml:"scheme,omitempty"`
|
||||
// More than this many samples post metric-relabelling will cause the scrape to fail.
|
||||
// More than this many samples post metric-relabeling will cause the scrape to fail.
|
||||
SampleLimit uint `yaml:"sample_limit,omitempty"`
|
||||
// More than this many targets after the target relabeling will cause the
|
||||
// scrapes to fail.
|
||||
TargetLimit uint `yaml:"target_limit,omitempty"`
|
||||
|
||||
// We cannot do proper Go type embedding below as the parser will then parse
|
||||
// values arbitrarily into the overflow maps of further-down types.
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/go-kit/kit/log"
|
||||
config_util "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/version"
|
||||
|
||||
"github.com/prometheus/prometheus/discovery/refresh"
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
|
@ -33,6 +34,8 @@ const (
|
|||
swarmLabel = model.MetaLabelPrefix + "dockerswarm_"
|
||||
)
|
||||
|
||||
var userAgent = fmt.Sprintf("Prometheus/%s", version.Version)
|
||||
|
||||
// DefaultSDConfig is the default Docker Swarm SD configuration.
|
||||
var DefaultSDConfig = SDConfig{
|
||||
RefreshInterval: model.Duration(60 * time.Second),
|
||||
|
@ -116,6 +119,9 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) {
|
|||
Timeout: time.Duration(conf.RefreshInterval),
|
||||
}),
|
||||
client.WithScheme(hostURL.Scheme),
|
||||
client.WithHTTPHeaders(map[string]string{
|
||||
"User-Agent": userAgent,
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ package kubernetes
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -36,6 +37,7 @@ import (
|
|||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"github.com/prometheus/common/version"
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
)
|
||||
|
||||
|
@ -49,6 +51,8 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
// Http header
|
||||
userAgent = fmt.Sprintf("Prometheus/%s", version.Version)
|
||||
// Custom events metric
|
||||
eventCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
|
@ -262,7 +266,7 @@ func New(l log.Logger, conf *SDConfig) (*Discovery, error) {
|
|||
}
|
||||
}
|
||||
|
||||
kcfg.UserAgent = "Prometheus/discovery"
|
||||
kcfg.UserAgent = userAgent
|
||||
|
||||
c, err := kubernetes.NewForConfig(kcfg)
|
||||
if err != nil {
|
||||
|
|
|
@ -252,9 +252,16 @@ metric_relabel_configs:
|
|||
[ - <relabel_config> ... ]
|
||||
|
||||
# Per-scrape limit on number of scraped samples that will be accepted.
|
||||
# If more than this number of samples are present after metric relabelling
|
||||
# If more than this number of samples are present after metric relabeling
|
||||
# the entire scrape will be treated as failed. 0 means no limit.
|
||||
[ sample_limit: <int> | default = 0 ]
|
||||
|
||||
# Per-scrape config limit on number of unique targets that will be
|
||||
# accepted. If more than this number of targets are present after target
|
||||
# relabeling, Prometheus will mark the targets as failed without scraping them.
|
||||
# 0 means no limit. This is an experimental feature, this behaviour could
|
||||
# change in the future.
|
||||
[ target_limit: <int> | default = 0 ]
|
||||
```
|
||||
|
||||
Where `<job_name>` must be unique across all scrape configurations.
|
||||
|
|
|
@ -259,6 +259,20 @@
|
|||
description: 'Prometheus %(prometheusName)s has missed {{ printf "%%.0f" $value }} rule group evaluations in the last 5m.' % $._config,
|
||||
},
|
||||
},
|
||||
{
|
||||
alert: 'PrometheusTargetLimitHit',
|
||||
expr: |||
|
||||
increase(prometheus_target_scrape_pool_exceeded_target_limit_total{%(prometheusSelector)s}[5m]) > 0
|
||||
||| % $._config,
|
||||
'for': '15m',
|
||||
labels: {
|
||||
severity: 'warning',
|
||||
},
|
||||
annotations: {
|
||||
summary: 'Prometheus has dropped targets because some scrape configs have exceeded the targets limit.',
|
||||
description: 'Prometheus %(prometheusName)s has dropped {{ printf "%%.0f" $value }} targets because the number of targets exceeded the configured target_limit.' % $._config,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
|
|
157
scrape/scrape.go
157
scrape/scrape.go
|
@ -81,15 +81,35 @@ var (
|
|||
targetScrapePoolReloads = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_target_scrape_pool_reloads_total",
|
||||
Help: "Total number of scrape loop reloads.",
|
||||
Help: "Total number of scrape pool reloads.",
|
||||
},
|
||||
)
|
||||
targetScrapePoolReloadsFailed = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_target_scrape_pool_reloads_failed_total",
|
||||
Help: "Total number of failed scrape loop reloads.",
|
||||
Help: "Total number of failed scrape pool reloads.",
|
||||
},
|
||||
)
|
||||
targetScrapePoolExceededTargetLimit = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_target_scrape_pool_exceeded_target_limit_total",
|
||||
Help: "Total number of times scrape pools hit the target limit, during sync or config reload.",
|
||||
},
|
||||
)
|
||||
targetScrapePoolTargetLimit = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "prometheus_target_scrape_pool_target_limit",
|
||||
Help: "Maximum number of targets allowed in this scrape pool.",
|
||||
},
|
||||
[]string{"scrape_job"},
|
||||
)
|
||||
targetScrapePoolTargetsAdded = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "prometheus_target_scrape_pool_targets",
|
||||
Help: "Current number of targets in this scrape pool.",
|
||||
},
|
||||
[]string{"scrape_job"},
|
||||
)
|
||||
targetSyncIntervalLength = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "prometheus_target_sync_length_seconds",
|
||||
|
@ -151,6 +171,9 @@ func init() {
|
|||
targetScrapeSampleDuplicate,
|
||||
targetScrapeSampleOutOfOrder,
|
||||
targetScrapeSampleOutOfBounds,
|
||||
targetScrapePoolExceededTargetLimit,
|
||||
targetScrapePoolTargetLimit,
|
||||
targetScrapePoolTargetsAdded,
|
||||
targetScrapeCacheFlushForced,
|
||||
targetMetadataCache,
|
||||
)
|
||||
|
@ -170,6 +193,7 @@ type scrapePool struct {
|
|||
droppedTargets []*Target
|
||||
loops map[uint64]loop
|
||||
cancel context.CancelFunc
|
||||
targetLimitHit bool // Internal state to speed up the target_limit checks.
|
||||
|
||||
// Constructor for new scrape loops. This is settable for testing convenience.
|
||||
newLoop func(scrapeLoopOptions) loop
|
||||
|
@ -278,6 +302,13 @@ func (sp *scrapePool) stop() {
|
|||
}
|
||||
wg.Wait()
|
||||
sp.client.CloseIdleConnections()
|
||||
|
||||
if sp.config != nil {
|
||||
targetScrapePoolSyncsCounter.DeleteLabelValues(sp.config.JobName)
|
||||
targetScrapePoolTargetLimit.DeleteLabelValues(sp.config.JobName)
|
||||
targetScrapePoolTargetsAdded.DeleteLabelValues(sp.config.JobName)
|
||||
targetSyncIntervalLength.DeleteLabelValues(sp.config.JobName)
|
||||
}
|
||||
}
|
||||
|
||||
// reload the scrape pool with the given scrape configuration. The target state is preserved
|
||||
|
@ -301,6 +332,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
|||
oldClient := sp.client
|
||||
sp.client = client
|
||||
|
||||
targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
interval = time.Duration(sp.config.ScrapeInterval)
|
||||
|
@ -311,6 +344,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
|||
mrc = sp.config.MetricRelabelConfigs
|
||||
)
|
||||
|
||||
forcedErr := sp.refreshTargetLimitErr()
|
||||
for fp, oldLoop := range sp.loops {
|
||||
var cache *scrapeCache
|
||||
if oc := oldLoop.getCache(); reuseCache && oc != nil {
|
||||
|
@ -338,6 +372,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
|||
oldLoop.stop()
|
||||
wg.Done()
|
||||
|
||||
newLoop.setForcedError(forcedErr)
|
||||
go newLoop.run(interval, timeout, nil)
|
||||
}(oldLoop, newLoop)
|
||||
|
||||
|
@ -355,10 +390,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
|||
// Sync converts target groups into actual scrape targets and synchronizes
|
||||
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
|
||||
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
var all []*Target
|
||||
sp.mtx.Lock()
|
||||
sp.droppedTargets = []*Target{}
|
||||
for _, tg := range tgs {
|
||||
targets, err := targetsFromGroup(tg, sp.config)
|
||||
|
@ -374,7 +411,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
|||
}
|
||||
}
|
||||
}
|
||||
sp.mtx.Unlock()
|
||||
sp.sync(all)
|
||||
|
||||
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
||||
|
@ -387,11 +423,9 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
|||
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
||||
// It returns after all stopped scrape loops terminated.
|
||||
func (sp *scrapePool) sync(targets []*Target) {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
// This function expects that you have acquired the sp.mtx lock.
|
||||
var (
|
||||
uniqueTargets = map[uint64]struct{}{}
|
||||
uniqueLoops = make(map[uint64]loop)
|
||||
interval = time.Duration(sp.config.ScrapeInterval)
|
||||
timeout = time.Duration(sp.config.ScrapeTimeout)
|
||||
limit = int(sp.config.SampleLimit)
|
||||
|
@ -403,7 +437,6 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|||
for _, t := range targets {
|
||||
t := t
|
||||
hash := t.hash()
|
||||
uniqueTargets[hash] = struct{}{}
|
||||
|
||||
if _, ok := sp.activeTargets[hash]; !ok {
|
||||
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
|
||||
|
@ -419,8 +452,12 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|||
sp.activeTargets[hash] = t
|
||||
sp.loops[hash] = l
|
||||
|
||||
go l.run(interval, timeout, nil)
|
||||
uniqueLoops[hash] = l
|
||||
} else {
|
||||
// This might be a duplicated target.
|
||||
if _, ok := uniqueLoops[hash]; !ok {
|
||||
uniqueLoops[hash] = nil
|
||||
}
|
||||
// Need to keep the most updated labels information
|
||||
// for displaying it in the Service Discovery web page.
|
||||
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
|
||||
|
@ -431,7 +468,7 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|||
|
||||
// Stop and remove old targets and scraper loops.
|
||||
for hash := range sp.activeTargets {
|
||||
if _, ok := uniqueTargets[hash]; !ok {
|
||||
if _, ok := uniqueLoops[hash]; !ok {
|
||||
wg.Add(1)
|
||||
go func(l loop) {
|
||||
l.stop()
|
||||
|
@ -443,6 +480,16 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|||
}
|
||||
}
|
||||
|
||||
targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
|
||||
forcedErr := sp.refreshTargetLimitErr()
|
||||
for _, l := range sp.loops {
|
||||
l.setForcedError(forcedErr)
|
||||
}
|
||||
for _, l := range uniqueLoops {
|
||||
if l != nil {
|
||||
go l.run(interval, timeout, nil)
|
||||
}
|
||||
}
|
||||
// Wait for all potentially stopped scrapers to terminate.
|
||||
// This covers the case of flapping targets. If the server is under high load, a new scraper
|
||||
// may be active and tries to insert. The old scraper that didn't terminate yet could still
|
||||
|
@ -450,6 +497,26 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
// refreshTargetLimitErr returns an error that can be passed to the scrape loops
|
||||
// if the number of targets exceeds the configured limit.
|
||||
func (sp *scrapePool) refreshTargetLimitErr() error {
|
||||
// This function expects that you have acquired the sp.mtx lock.
|
||||
if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit {
|
||||
return nil
|
||||
}
|
||||
l := len(sp.activeTargets)
|
||||
if l <= int(sp.config.TargetLimit) && !sp.targetLimitHit {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
sp.targetLimitHit = l > int(sp.config.TargetLimit)
|
||||
if sp.targetLimitHit {
|
||||
targetScrapePoolExceededTargetLimit.Inc()
|
||||
err = fmt.Errorf("target_limit exceeded (number of targets: %d, limit: %d)", l, sp.config.TargetLimit)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*relabel.Config) labels.Labels {
|
||||
lb := labels.NewBuilder(lset)
|
||||
|
||||
|
@ -590,6 +657,7 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
|
|||
// A loop can run and be stopped again. It must not be reused after it was stopped.
|
||||
type loop interface {
|
||||
run(interval, timeout time.Duration, errc chan<- error)
|
||||
setForcedError(err error)
|
||||
stop()
|
||||
getCache() *scrapeCache
|
||||
disableEndOfRunStalenessMarkers()
|
||||
|
@ -610,6 +678,8 @@ type scrapeLoop struct {
|
|||
buffers *pool.Pool
|
||||
jitterSeed uint64
|
||||
honorTimestamps bool
|
||||
forcedErr error
|
||||
forcedErrMtx sync.Mutex
|
||||
|
||||
appender func(ctx context.Context) storage.Appender
|
||||
sampleMutator labelsMutator
|
||||
|
@ -953,10 +1023,7 @@ mainLoop:
|
|||
// together with reporting metrics, by using as few appenders as possible.
|
||||
// In the happy scenario, a single appender is used.
|
||||
func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time {
|
||||
var (
|
||||
start = time.Now()
|
||||
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
|
||||
)
|
||||
start := time.Now()
|
||||
|
||||
// Only record after the first scrape.
|
||||
if !last.IsZero() {
|
||||
|
@ -968,7 +1035,34 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
|||
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
|
||||
buf := bytes.NewBuffer(b)
|
||||
|
||||
contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
|
||||
app := sl.appender(sl.ctx)
|
||||
var total, added, seriesAdded int
|
||||
var err, appErr, scrapeErr error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
app.Rollback()
|
||||
return
|
||||
}
|
||||
err = app.Commit()
|
||||
if err != nil {
|
||||
level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err)
|
||||
}
|
||||
}()
|
||||
if forcedErr := sl.getForcedError(); forcedErr != nil {
|
||||
appErr = forcedErr
|
||||
// Add stale markers.
|
||||
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender(sl.ctx)
|
||||
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
|
||||
}
|
||||
if errc != nil {
|
||||
errc <- forcedErr
|
||||
}
|
||||
} else {
|
||||
var contentType string
|
||||
scrapeCtx, cancel := context.WithTimeout(sl.ctx, timeout)
|
||||
contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf)
|
||||
cancel()
|
||||
|
||||
if scrapeErr == nil {
|
||||
|
@ -986,33 +1080,22 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
|||
}
|
||||
}
|
||||
|
||||
app := sl.appender(scrapeCtx)
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
app.Rollback()
|
||||
return
|
||||
}
|
||||
err = app.Commit()
|
||||
if err != nil {
|
||||
level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err)
|
||||
}
|
||||
}()
|
||||
// A failed scrape is the same as an empty scrape,
|
||||
// we still call sl.append to trigger stale markers.
|
||||
total, added, seriesAdded, appErr := sl.append(app, b, contentType, start)
|
||||
total, added, seriesAdded, appErr = sl.append(app, b, contentType, start)
|
||||
if appErr != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender(scrapeCtx)
|
||||
app = sl.appender(sl.ctx)
|
||||
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
|
||||
// The append failed, probably due to a parse error or sample limit.
|
||||
// Call sl.append again with an empty scrape to trigger stale markers.
|
||||
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender(scrapeCtx)
|
||||
app = sl.appender(sl.ctx)
|
||||
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sl.buffers.Put(b)
|
||||
|
||||
|
@ -1026,6 +1109,18 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
|||
return start
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) setForcedError(err error) {
|
||||
sl.forcedErrMtx.Lock()
|
||||
defer sl.forcedErrMtx.Unlock()
|
||||
sl.forcedErr = err
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) getForcedError() error {
|
||||
sl.forcedErrMtx.Lock()
|
||||
defer sl.forcedErrMtx.Unlock()
|
||||
return sl.forcedErr
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) {
|
||||
// Scraping has stopped. We want to write stale markers but
|
||||
// the target may be recreated, so we wait just over 2 scrape intervals
|
||||
|
|
|
@ -140,6 +140,8 @@ func TestDiscoveredLabelsUpdate(t *testing.T) {
|
|||
type testLoop struct {
|
||||
startFunc func(interval, timeout time.Duration, errc chan<- error)
|
||||
stopFunc func()
|
||||
forcedErr error
|
||||
forcedErrMtx sync.Mutex
|
||||
}
|
||||
|
||||
func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
||||
|
@ -149,6 +151,18 @@ func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
|||
func (l *testLoop) disableEndOfRunStalenessMarkers() {
|
||||
}
|
||||
|
||||
func (l *testLoop) setForcedError(err error) {
|
||||
l.forcedErrMtx.Lock()
|
||||
defer l.forcedErrMtx.Unlock()
|
||||
l.forcedErr = err
|
||||
}
|
||||
|
||||
func (l *testLoop) getForcedError() error {
|
||||
l.forcedErrMtx.Lock()
|
||||
defer l.forcedErrMtx.Unlock()
|
||||
return l.forcedErr
|
||||
}
|
||||
|
||||
func (l *testLoop) stop() {
|
||||
l.stopFunc()
|
||||
}
|
||||
|
@ -301,6 +315,92 @@ func TestScrapePoolReload(t *testing.T) {
|
|||
testutil.Equals(t, numTargets, len(sp.loops), "Unexpected number of stopped loops after reload")
|
||||
}
|
||||
|
||||
func TestScrapePoolTargetLimit(t *testing.T) {
|
||||
// On starting to run, new loops created on reload check whether their preceding
|
||||
// equivalents have been stopped.
|
||||
newLoop := func(opts scrapeLoopOptions) loop {
|
||||
l := &testLoop{
|
||||
startFunc: func(interval, timeout time.Duration, errc chan<- error) {},
|
||||
stopFunc: func() {},
|
||||
}
|
||||
return l
|
||||
}
|
||||
sp := &scrapePool{
|
||||
appendable: &nopAppendable{},
|
||||
activeTargets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{},
|
||||
newLoop: newLoop,
|
||||
logger: nil,
|
||||
client: http.DefaultClient,
|
||||
}
|
||||
|
||||
var tgs = []*targetgroup.Group{}
|
||||
for i := 0; i < 50; i++ {
|
||||
tgs = append(tgs,
|
||||
&targetgroup.Group{
|
||||
Targets: []model.LabelSet{
|
||||
{model.AddressLabel: model.LabelValue(fmt.Sprintf("127.0.0.1:%d", 9090+i))},
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
var limit uint
|
||||
reloadWithLimit := func(l uint) {
|
||||
limit = l
|
||||
testutil.Ok(t, sp.reload(&config.ScrapeConfig{
|
||||
ScrapeInterval: model.Duration(3 * time.Second),
|
||||
ScrapeTimeout: model.Duration(2 * time.Second),
|
||||
TargetLimit: l,
|
||||
}))
|
||||
}
|
||||
|
||||
var targets int
|
||||
loadTargets := func(n int) {
|
||||
targets = n
|
||||
sp.Sync(tgs[:n])
|
||||
}
|
||||
|
||||
validateErrorMessage := func(shouldErr bool) {
|
||||
for _, l := range sp.loops {
|
||||
lerr := l.(*testLoop).getForcedError()
|
||||
if shouldErr {
|
||||
testutil.Assert(t, lerr != nil, "error was expected for %d targets with a limit of %d", targets, limit)
|
||||
testutil.Equals(t, fmt.Sprintf("target_limit exceeded (number of targets: %d, limit: %d)", targets, limit), lerr.Error())
|
||||
} else {
|
||||
testutil.Equals(t, nil, lerr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
reloadWithLimit(0)
|
||||
loadTargets(50)
|
||||
|
||||
// Simulate an initial config with a limit.
|
||||
sp.config.TargetLimit = 30
|
||||
limit = 30
|
||||
loadTargets(50)
|
||||
validateErrorMessage(true)
|
||||
|
||||
reloadWithLimit(50)
|
||||
validateErrorMessage(false)
|
||||
|
||||
reloadWithLimit(40)
|
||||
validateErrorMessage(true)
|
||||
|
||||
loadTargets(30)
|
||||
validateErrorMessage(false)
|
||||
|
||||
loadTargets(40)
|
||||
validateErrorMessage(false)
|
||||
|
||||
loadTargets(41)
|
||||
validateErrorMessage(true)
|
||||
|
||||
reloadWithLimit(51)
|
||||
validateErrorMessage(false)
|
||||
}
|
||||
|
||||
func TestScrapePoolAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{}
|
||||
app := &nopAppendable{}
|
||||
|
@ -595,6 +695,57 @@ func TestScrapeLoopRun(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestScrapeLoopForcedErr(t *testing.T) {
|
||||
var (
|
||||
signal = make(chan struct{}, 1)
|
||||
errc = make(chan error)
|
||||
|
||||
scraper = &testScraper{}
|
||||
app = func(ctx context.Context) storage.Appender { return &nopAppender{} }
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sl := newScrapeLoop(ctx,
|
||||
scraper,
|
||||
nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
app,
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
)
|
||||
|
||||
forcedErr := fmt.Errorf("forced err")
|
||||
sl.setForcedError(forcedErr)
|
||||
|
||||
scraper.scrapeFunc = func(context.Context, io.Writer) error {
|
||||
t.Fatalf("should not be scraped")
|
||||
return nil
|
||||
}
|
||||
|
||||
go func() {
|
||||
sl.run(time.Second, time.Hour, errc)
|
||||
signal <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errc:
|
||||
if err != forcedErr {
|
||||
t.Fatalf("Expected forced error but got: %s", err)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("Expected forced error but got none")
|
||||
}
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case <-signal:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Scrape not stopped")
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrapeLoopMetadata(t *testing.T) {
|
||||
var (
|
||||
signal = make(chan struct{})
|
||||
|
|
|
@ -33,7 +33,9 @@ var (
|
|||
|
||||
// Appendable allows creating appenders.
|
||||
type Appendable interface {
|
||||
// Appender returns a new appender for the storage.
|
||||
// Appender returns a new appender for the storage. The implementation
|
||||
// can choose whether or not to use the context, for deadlines or to check
|
||||
// for errors.
|
||||
Appender(ctx context.Context) Appender
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue