scrape: allow providing a custom Dialer for scraping (#10415)

* scrape: allow providing a custom Dialer for scraping

This commit extends config.ScrapeConfig with an optional field to
override how HTTP connections to targets are created. This field is not
set directly in Prometheus, and is only added for the convenience of
downstream importers.

Closes #9706

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* scrape: move custom dial function to scrape.Options

Signed-off-by: Robert Fratto <robertfratto@gmail.com>
This commit is contained in:
Robert Fratto 2022-03-08 18:48:47 -05:00 committed by GitHub
parent 0e1c39c50c
commit f0ec619eec
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 12 deletions

View file

@ -24,6 +24,7 @@ import (
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
@ -123,6 +124,10 @@ func NewManager(o *Options, logger log.Logger, app storage.Appendable) *Manager
// Options are the configuration parameters to the scrape manager. // Options are the configuration parameters to the scrape manager.
type Options struct { type Options struct {
ExtraMetrics bool ExtraMetrics bool
// Optional function to override dialing to scrape targets. Go's default
// dialer is used when not provided.
DialContextFunc config_util.DialContextFunc
} }
// Manager maintains a set of scrape pools and manages start/stop cycles // Manager maintains a set of scrape pools and manages start/stop cycles
@ -191,7 +196,7 @@ func (m *Manager) reload() {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName) level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
continue continue
} }
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName), m.opts.ExtraMetrics) sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName), m.opts.ExtraMetrics, m.opts.DialContextFunc)
if err != nil { if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName) level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue continue

View file

@ -224,6 +224,7 @@ type scrapePool struct {
appendable storage.Appendable appendable storage.Appendable
logger log.Logger logger log.Logger
cancel context.CancelFunc cancel context.CancelFunc
dialFunc config_util.DialContextFunc
// mtx must not be taken after targetMtx. // mtx must not be taken after targetMtx.
mtx sync.Mutex mtx sync.Mutex
@ -264,13 +265,18 @@ const maxAheadTime = 10 * time.Minute
type labelsMutator func(labels.Labels) labels.Labels type labelsMutator func(labels.Labels) labels.Labels
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportExtraMetrics bool) (*scrapePool, error) { func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportExtraMetrics bool, dialFunc config_util.DialContextFunc) (*scrapePool, error) {
targetScrapePools.Inc() targetScrapePools.Inc()
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName) var extraOptions []config_util.HTTPClientOption
if dialFunc != nil {
extraOptions = append(extraOptions, config_util.WithDialContextFunc(dialFunc))
}
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, extraOptions...)
if err != nil { if err != nil {
targetScrapePoolsFailed.Inc() targetScrapePoolsFailed.Inc()
return nil, errors.Wrap(err, "error creating HTTP client") return nil, errors.Wrap(err, "error creating HTTP client")
@ -287,6 +293,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
activeTargets: map[uint64]*Target{}, activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{}, loops: map[uint64]loop{},
logger: logger, logger: logger,
dialFunc: dialFunc,
} }
sp.newLoop = func(opts scrapeLoopOptions) loop { sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache. // Update the targets retrieval function for metadata to a new scrape cache.
@ -381,7 +388,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
targetScrapePoolReloads.Inc() targetScrapePoolReloads.Inc()
start := time.Now() start := time.Now()
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName) var extraOptions []config_util.HTTPClientOption
if sp.dialFunc != nil {
extraOptions = append(extraOptions, config_util.WithDialContextFunc(sp.dialFunc))
}
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, extraOptions...)
if err != nil { if err != nil {
targetScrapePoolReloadsFailed.Inc() targetScrapePoolReloadsFailed.Inc()
return errors.Wrap(err, "error creating HTTP client") return errors.Wrap(err, "error creating HTTP client")

View file

@ -57,7 +57,7 @@ func TestNewScrapePool(t *testing.T) {
var ( var (
app = &nopAppendable{} app = &nopAppendable{}
cfg = &config.ScrapeConfig{} cfg = &config.ScrapeConfig{}
sp, _ = newScrapePool(cfg, app, 0, nil, false) sp, _ = newScrapePool(cfg, app, 0, nil, false, nil)
) )
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app { if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
@ -92,7 +92,7 @@ func TestDroppedTargetsList(t *testing.T) {
}, },
}, },
} }
sp, _ = newScrapePool(cfg, app, 0, nil, false) sp, _ = newScrapePool(cfg, app, 0, nil, false, nil)
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}" expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
expectedLength = 1 expectedLength = 1
) )
@ -455,7 +455,7 @@ func TestScrapePoolTargetLimit(t *testing.T) {
func TestScrapePoolAppender(t *testing.T) { func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{} cfg := &config.ScrapeConfig{}
app := &nopAppendable{} app := &nopAppendable{}
sp, _ := newScrapePool(cfg, app, 0, nil, false) sp, _ := newScrapePool(cfg, app, 0, nil, false, nil)
loop := sp.newLoop(scrapeLoopOptions{ loop := sp.newLoop(scrapeLoopOptions{
target: &Target{}, target: &Target{},
@ -497,7 +497,7 @@ func TestScrapePoolRaces(t *testing.T) {
newConfig := func() *config.ScrapeConfig { newConfig := func() *config.ScrapeConfig {
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout} return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
} }
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false) sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false, nil)
tgts := []*targetgroup.Group{ tgts := []*targetgroup.Group{
{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
@ -2512,7 +2512,7 @@ func TestReuseScrapeCache(t *testing.T) {
ScrapeInterval: model.Duration(5 * time.Second), ScrapeInterval: model.Duration(5 * time.Second),
MetricsPath: "/metrics", MetricsPath: "/metrics",
} }
sp, _ = newScrapePool(cfg, app, 0, nil, false) sp, _ = newScrapePool(cfg, app, 0, nil, false, nil)
t1 = &Target{ t1 = &Target{
discoveredLabels: labels.Labels{ discoveredLabels: labels.Labels{
labels.Label{ labels.Label{
@ -2722,7 +2722,7 @@ func TestReuseCacheRace(t *testing.T) {
ScrapeInterval: model.Duration(5 * time.Second), ScrapeInterval: model.Duration(5 * time.Second),
MetricsPath: "/metrics", MetricsPath: "/metrics",
} }
sp, _ = newScrapePool(cfg, app, 0, nil, false) sp, _ = newScrapePool(cfg, app, 0, nil, false, nil)
t1 = &Target{ t1 = &Target{
discoveredLabels: labels.Labels{ discoveredLabels: labels.Labels{
labels.Label{ labels.Label{
@ -2851,7 +2851,7 @@ func TestScrapeReportLimit(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
sp, err := newScrapePool(cfg, s, 0, nil, false) sp, err := newScrapePool(cfg, s, 0, nil, false, nil)
require.NoError(t, err) require.NoError(t, err)
defer sp.stop() defer sp.stop()
@ -3018,7 +3018,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
}, },
}, },
} }
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false) sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false, nil)
tgts := []*targetgroup.Group{ tgts := []*targetgroup.Group{
{ {
Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}}, Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},