From f0ec619eece43e7c069b7b22b54d7d7bac7d7da2 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 8 Mar 2022 18:48:47 -0500 Subject: [PATCH] scrape: allow providing a custom Dialer for scraping (#10415) * scrape: allow providing a custom Dialer for scraping This commit extends config.ScrapeConfig with an optional field to override how HTTP connections to targets are created. This field is not set directly in Prometheus, and is only added for the convenience of downstream importers. Closes #9706 Signed-off-by: Robert Fratto * scrape: move custom dial function to scrape.Options Signed-off-by: Robert Fratto --- scrape/manager.go | 7 ++++++- scrape/scrape.go | 18 +++++++++++++++--- scrape/scrape_test.go | 16 ++++++++-------- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/scrape/manager.go b/scrape/manager.go index 35d47a86b..37eb7bf43 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -24,6 +24,7 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + config_util "github.com/prometheus/common/config" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -123,6 +124,10 @@ func NewManager(o *Options, logger log.Logger, app storage.Appendable) *Manager // Options are the configuration parameters to the scrape manager. type Options struct { ExtraMetrics bool + + // Optional function to override dialing to scrape targets. Go's default + // dialer is used when not provided. + DialContextFunc config_util.DialContextFunc } // Manager maintains a set of scrape pools and manages start/stop cycles @@ -191,7 +196,7 @@ func (m *Manager) reload() { level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName) continue } - sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName), m.opts.ExtraMetrics) + sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName), m.opts.ExtraMetrics, m.opts.DialContextFunc) if err != nil { level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName) continue diff --git a/scrape/scrape.go b/scrape/scrape.go index ba6183968..33cb3592f 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -224,6 +224,7 @@ type scrapePool struct { appendable storage.Appendable logger log.Logger cancel context.CancelFunc + dialFunc config_util.DialContextFunc // mtx must not be taken after targetMtx. mtx sync.Mutex @@ -264,13 +265,18 @@ const maxAheadTime = 10 * time.Minute type labelsMutator func(labels.Labels) labels.Labels -func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportExtraMetrics bool) (*scrapePool, error) { +func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportExtraMetrics bool, dialFunc config_util.DialContextFunc) (*scrapePool, error) { targetScrapePools.Inc() if logger == nil { logger = log.NewNopLogger() } - client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName) + var extraOptions []config_util.HTTPClientOption + if dialFunc != nil { + extraOptions = append(extraOptions, config_util.WithDialContextFunc(dialFunc)) + } + + client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, extraOptions...) if err != nil { targetScrapePoolsFailed.Inc() return nil, errors.Wrap(err, "error creating HTTP client") @@ -287,6 +293,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed activeTargets: map[uint64]*Target{}, loops: map[uint64]loop{}, logger: logger, + dialFunc: dialFunc, } sp.newLoop = func(opts scrapeLoopOptions) loop { // Update the targets retrieval function for metadata to a new scrape cache. @@ -381,7 +388,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { targetScrapePoolReloads.Inc() start := time.Now() - client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName) + var extraOptions []config_util.HTTPClientOption + if sp.dialFunc != nil { + extraOptions = append(extraOptions, config_util.WithDialContextFunc(sp.dialFunc)) + } + + client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, extraOptions...) if err != nil { targetScrapePoolReloadsFailed.Inc() return errors.Wrap(err, "error creating HTTP client") diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 85ad76232..80cbf344a 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -57,7 +57,7 @@ func TestNewScrapePool(t *testing.T) { var ( app = &nopAppendable{} cfg = &config.ScrapeConfig{} - sp, _ = newScrapePool(cfg, app, 0, nil, false) + sp, _ = newScrapePool(cfg, app, 0, nil, false, nil) ) if a, ok := sp.appendable.(*nopAppendable); !ok || a != app { @@ -92,7 +92,7 @@ func TestDroppedTargetsList(t *testing.T) { }, }, } - sp, _ = newScrapePool(cfg, app, 0, nil, false) + sp, _ = newScrapePool(cfg, app, 0, nil, false, nil) expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}" expectedLength = 1 ) @@ -455,7 +455,7 @@ func TestScrapePoolTargetLimit(t *testing.T) { func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} - sp, _ := newScrapePool(cfg, app, 0, nil, false) + sp, _ := newScrapePool(cfg, app, 0, nil, false, nil) loop := sp.newLoop(scrapeLoopOptions{ target: &Target{}, @@ -497,7 +497,7 @@ func TestScrapePoolRaces(t *testing.T) { newConfig := func() *config.ScrapeConfig { return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout} } - sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false) + sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false, nil) tgts := []*targetgroup.Group{ { Targets: []model.LabelSet{ @@ -2512,7 +2512,7 @@ func TestReuseScrapeCache(t *testing.T) { ScrapeInterval: model.Duration(5 * time.Second), MetricsPath: "/metrics", } - sp, _ = newScrapePool(cfg, app, 0, nil, false) + sp, _ = newScrapePool(cfg, app, 0, nil, false, nil) t1 = &Target{ discoveredLabels: labels.Labels{ labels.Label{ @@ -2722,7 +2722,7 @@ func TestReuseCacheRace(t *testing.T) { ScrapeInterval: model.Duration(5 * time.Second), MetricsPath: "/metrics", } - sp, _ = newScrapePool(cfg, app, 0, nil, false) + sp, _ = newScrapePool(cfg, app, 0, nil, false, nil) t1 = &Target{ discoveredLabels: labels.Labels{ labels.Label{ @@ -2851,7 +2851,7 @@ func TestScrapeReportLimit(t *testing.T) { })) defer ts.Close() - sp, err := newScrapePool(cfg, s, 0, nil, false) + sp, err := newScrapePool(cfg, s, 0, nil, false, nil) require.NoError(t, err) defer sp.stop() @@ -3018,7 +3018,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) { }, }, } - sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false) + sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false, nil) tgts := []*targetgroup.Group{ { Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},