chore(discoveryManager): expose Discoverer refresh function (#10531)

Signed-off-by: secustor <sebastian@poxhofer.at>
This commit is contained in:
Sebastian Poxhofer 2022-06-13 21:06:15 +02:00 committed by GitHub
parent 5d1756c822
commit 3f9a9d1e62
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 16 deletions

View file

@ -136,12 +136,12 @@ func NewDiscovery(conf *SDConfig, logger log.Logger, clientOpts []config.HTTPCli
logger, logger,
"http", "http",
time.Duration(conf.RefreshInterval), time.Duration(conf.RefreshInterval),
d.refresh, d.Refresh,
) )
return d, nil return d, nil
} }
func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) {
req, err := http.NewRequest("GET", d.url, nil) req, err := http.NewRequest("GET", d.url, nil)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -45,7 +45,7 @@ func TestHTTPValidRefresh(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
tgs, err := d.refresh(ctx) tgs, err := d.Refresh(ctx)
require.NoError(t, err) require.NoError(t, err)
expectedTargets := []*targetgroup.Group{ expectedTargets := []*targetgroup.Group{
@ -83,7 +83,7 @@ func TestHTTPInvalidCode(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
_, err = d.refresh(ctx) _, err = d.Refresh(ctx)
require.EqualError(t, err, "server returned HTTP status 400 Bad Request") require.EqualError(t, err, "server returned HTTP status 400 Bad Request")
require.Equal(t, 1.0, getFailureCount()) require.Equal(t, 1.0, getFailureCount())
} }
@ -105,7 +105,7 @@ func TestHTTPInvalidFormat(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
_, err = d.refresh(ctx) _, err = d.Refresh(ctx)
require.EqualError(t, err, `unsupported content type "text/plain; charset=utf-8"`) require.EqualError(t, err, `unsupported content type "text/plain; charset=utf-8"`)
require.Equal(t, 1.0, getFailureCount()) require.Equal(t, 1.0, getFailureCount())
} }
@ -423,7 +423,7 @@ func TestSourceDisappeared(t *testing.T) {
ctx := context.Background() ctx := context.Background()
for i, res := range test.responses { for i, res := range test.responses {
stubResponse = res stubResponse = res
tgs, err := d.refresh(ctx) tgs, err := d.Refresh(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, test.expectedTargets[i], tgs) require.Equal(t, test.expectedTargets[i], tgs)
} }

View file

@ -75,8 +75,8 @@ type poolKey struct {
provider string provider string
} }
// provider holds a Discoverer instance, its configuration, cancel func and its subscribers. // Provider holds a Discoverer instance, its configuration, cancel func and its subscribers.
type provider struct { type Provider struct {
name string name string
d Discoverer d Discoverer
config interface{} config interface{}
@ -92,11 +92,20 @@ type provider struct {
newSubs map[string]struct{} newSubs map[string]struct{}
} }
// Discoverer return the Discoverer of the provider
func (p *Provider) Discoverer() Discoverer {
return p.d
}
// IsStarted return true if Discoverer is started. // IsStarted return true if Discoverer is started.
func (p *provider) IsStarted() bool { func (p *Provider) IsStarted() bool {
return p.cancel != nil return p.cancel != nil
} }
func (p *Provider) Config() interface{} {
return p.config
}
// NewManager is the Discovery Manager constructor. // NewManager is the Discovery Manager constructor.
func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager)) *Manager { func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager)) *Manager {
if logger == nil { if logger == nil {
@ -148,7 +157,7 @@ type Manager struct {
targetsMtx sync.Mutex targetsMtx sync.Mutex
// providers keeps track of SD providers. // providers keeps track of SD providers.
providers []*provider providers []*Provider
// The sync channel sends the updates as a map where the key is the job value from the scrape config. // The sync channel sends the updates as a map where the key is the job value from the scrape config.
syncCh chan map[string][]*targetgroup.Group syncCh chan map[string][]*targetgroup.Group
@ -163,6 +172,11 @@ type Manager struct {
lastProvider uint lastProvider uint
} }
// Providers returns the currently configured SD providers.
func (m *Manager) Providers() []*Provider {
return m.providers
}
// Run starts the background processing. // Run starts the background processing.
func (m *Manager) Run() error { func (m *Manager) Run() error {
go m.sender() go m.sender()
@ -194,7 +208,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
wg sync.WaitGroup wg sync.WaitGroup
// keep shows if we keep any providers after reload. // keep shows if we keep any providers after reload.
keep bool keep bool
newProviders []*provider newProviders []*Provider
) )
for _, prov := range m.providers { for _, prov := range m.providers {
// Cancel obsolete providers. // Cancel obsolete providers.
@ -260,7 +274,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
// StartCustomProvider is used for sdtool. Only use this if you know what you're doing. // StartCustomProvider is used for sdtool. Only use this if you know what you're doing.
func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker Discoverer) { func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker Discoverer) {
p := &provider{ p := &Provider{
name: name, name: name,
d: worker, d: worker,
subs: map[string]struct{}{ subs: map[string]struct{}{
@ -271,7 +285,7 @@ func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker D
m.startProvider(ctx, p) m.startProvider(ctx, p)
} }
func (m *Manager) startProvider(ctx context.Context, p *provider) { func (m *Manager) startProvider(ctx context.Context, p *Provider) {
level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs)) level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*targetgroup.Group) updates := make(chan []*targetgroup.Group)
@ -283,7 +297,7 @@ func (m *Manager) startProvider(ctx context.Context, p *provider) {
} }
// cleaner cleans resources associated with provider. // cleaner cleans resources associated with provider.
func (m *Manager) cleaner(p *provider) { func (m *Manager) cleaner(p *Provider) {
m.targetsMtx.Lock() m.targetsMtx.Lock()
p.mu.RLock() p.mu.RLock()
for s := range p.subs { for s := range p.subs {
@ -296,7 +310,7 @@ func (m *Manager) cleaner(p *provider) {
} }
} }
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) {
// Ensure targets from this provider are cleaned up. // Ensure targets from this provider are cleaned up.
defer m.cleaner(p) defer m.cleaner(p)
for { for {
@ -422,7 +436,7 @@ func (m *Manager) registerProviders(cfgs Configs, setName string) int {
failed++ failed++
return return
} }
m.providers = append(m.providers, &provider{ m.providers = append(m.providers, &Provider{
name: fmt.Sprintf("%s/%d", typ, m.lastProvider), name: fmt.Sprintf("%s/%d", typ, m.lastProvider),
d: d, d: d,
config: cfg, config: cfg,