Races/3994 (#4005)

Fix race by properly locking access to scrape pools. Use separate mutex for information needed by UI so that UI isn't blocked when targets are being updated.
This commit is contained in:
Krasi Georgiev 2018-04-09 17:18:25 +03:00 committed by Brian Brazil
parent 6cf725c56d
commit ddd46de6f4
5 changed files with 90 additions and 75 deletions

View file

@ -40,6 +40,7 @@ func NewManager(logger log.Logger, app Appendable) *Manager {
scrapeConfigs: make(map[string]*config.ScrapeConfig), scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool), scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}), graceShut: make(chan struct{}),
targetsAll: make(map[string][]*Target),
} }
} }
@ -48,10 +49,16 @@ func NewManager(logger log.Logger, app Appendable) *Manager {
type Manager struct { type Manager struct {
logger log.Logger logger log.Logger
append Appendable append Appendable
graceShut chan struct{}
mtxTargets sync.Mutex // Guards the fields below.
targetsActive []*Target
targetsDropped []*Target
targetsAll map[string][]*Target
mtxScrape sync.Mutex // Guards the fields below.
scrapeConfigs map[string]*config.ScrapeConfig scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool scrapePools map[string]*scrapePool
mtx sync.RWMutex
graceShut chan struct{}
} }
// Run starts background processing to handle target updates and reload the scraping loops. // Run starts background processing to handle target updates and reload the scraping loops.
@ -68,6 +75,9 @@ func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
// Stop cancels all running scrape pools and blocks until all have exited. // Stop cancels all running scrape pools and blocks until all have exited.
func (m *Manager) Stop() { func (m *Manager) Stop() {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
for _, sp := range m.scrapePools { for _, sp := range m.scrapePools {
sp.stop() sp.stop()
} }
@ -76,8 +86,9 @@ func (m *Manager) Stop() {
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. // ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *Manager) ApplyConfig(cfg *config.Config) error { func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.mtx.Lock() m.mtxScrape.Lock()
defer m.mtx.Unlock() defer m.mtxScrape.Unlock()
c := make(map[string]*config.ScrapeConfig) c := make(map[string]*config.ScrapeConfig)
for _, scfg := range cfg.ScrapeConfigs { for _, scfg := range cfg.ScrapeConfigs {
c[scfg.JobName] = scfg c[scfg.JobName] = scfg
@ -97,71 +108,66 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
return nil return nil
} }
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. // TargetsAll returns active and dropped targets grouped by job_name.
func (m *Manager) TargetMap() map[string][]*Target { func (m *Manager) TargetsAll() map[string][]*Target {
m.mtx.Lock() m.mtxTargets.Lock()
defer m.mtx.Unlock() defer m.mtxTargets.Unlock()
return m.targetsAll
targets := make(map[string][]*Target)
for jobName, sp := range m.scrapePools {
sp.mtx.RLock()
for _, t := range sp.targets {
targets[jobName] = append(targets[jobName], t)
}
targets[jobName] = append(targets[jobName], sp.droppedTargets...)
sp.mtx.RUnlock()
} }
return targets // TargetsActive returns the active targets currently being scraped.
func (m *Manager) TargetsActive() []*Target {
m.mtxTargets.Lock()
defer m.mtxTargets.Unlock()
return m.targetsActive
} }
// Targets returns the targets currently being scraped. // TargetsDropped returns the dropped targets during relabelling.
func (m *Manager) Targets() []*Target { func (m *Manager) TargetsDropped() []*Target {
m.mtx.Lock() m.mtxTargets.Lock()
defer m.mtx.Unlock() defer m.mtxTargets.Unlock()
return m.targetsDropped
var targets []*Target
for _, p := range m.scrapePools {
p.mtx.RLock()
for _, tt := range p.targets {
targets = append(targets, tt)
}
p.mtx.RUnlock()
} }
return targets func (m *Manager) targetsUpdate(active, dropped map[string][]*Target) {
} m.mtxTargets.Lock()
defer m.mtxTargets.Unlock()
m.targetsAll = make(map[string][]*Target)
m.targetsActive = nil
m.targetsDropped = nil
for jobName, targets := range active {
m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...)
m.targetsActive = append(m.targetsActive, targets...)
// DroppedTargets returns the targets dropped during relabelling.
func (m *Manager) DroppedTargets() []*Target {
m.mtx.Lock()
defer m.mtx.Unlock()
var droppedTargets []*Target
for _, p := range m.scrapePools {
p.mtx.RLock()
droppedTargets = append(droppedTargets, p.droppedTargets...)
p.mtx.RUnlock()
} }
return droppedTargets for jobName, targets := range dropped {
m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...)
m.targetsDropped = append(m.targetsDropped, targets...)
}
} }
func (m *Manager) reload(t map[string][]*targetgroup.Group) { func (m *Manager) reload(t map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
tDropped := make(map[string][]*Target)
tActive := make(map[string][]*Target)
for tsetName, tgroup := range t { for tsetName, tgroup := range t {
var sp *scrapePool
if existing, ok := m.scrapePools[tsetName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[tsetName] scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok { if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName)) level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName))
continue continue
} }
sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
// Scrape pool doesn't exist so start a new one.
existing, ok := m.scrapePools[tsetName]
if !ok {
sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
m.scrapePools[tsetName] = sp m.scrapePools[tsetName] = sp
sp.Sync(tgroup)
} else { } else {
existing.Sync(tgroup) sp = existing
} }
tActive[tsetName], tDropped[tsetName] = sp.Sync(tgroup)
} }
m.targetsUpdate(tActive, tDropped)
} }

View file

@ -245,8 +245,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
} }
// Sync converts target groups into actual scrape targets and synchronizes // Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set. // the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { func (sp *scrapePool) Sync(tgs []*targetgroup.Group) (tActive []*Target, tDropped []*Target) {
start := time.Now() start := time.Now()
var all []*Target var all []*Target
@ -273,6 +273,15 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
time.Since(start).Seconds(), time.Since(start).Seconds(),
) )
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
sp.mtx.RLock()
for _, t := range sp.targets {
tActive = append(tActive, t)
}
tDropped = sp.droppedTargets
sp.mtx.RUnlock()
return tActive, tDropped
} }
// sync takes a list of potentially duplicated targets, deduplicates them, starts // sync takes a list of potentially duplicated targets, deduplicates them, starts

View file

@ -82,8 +82,8 @@ func (e *apiError) Error() string {
} }
type targetRetriever interface { type targetRetriever interface {
Targets() []*scrape.Target TargetsActive() []*scrape.Target
DroppedTargets() []*scrape.Target TargetsDropped() []*scrape.Target
} }
type alertmanagerRetriever interface { type alertmanagerRetriever interface {
@ -452,11 +452,12 @@ type TargetDiscovery struct {
} }
func (api *API) targets(r *http.Request) (interface{}, *apiError) { func (api *API) targets(r *http.Request) (interface{}, *apiError) {
targets := api.targetRetriever.Targets() tActive := api.targetRetriever.TargetsActive()
droppedTargets := api.targetRetriever.DroppedTargets() tDropped := api.targetRetriever.TargetsDropped()
res := &TargetDiscovery{ActiveTargets: make([]*Target, len(targets)), DroppedTargets: make([]*DroppedTarget, len(droppedTargets))} res := &TargetDiscovery{ActiveTargets: make([]*Target, len(tActive)), DroppedTargets: make([]*DroppedTarget, len(tDropped))}
for i, t := range tActive {
for i, t := range targets {
lastErrStr := "" lastErrStr := ""
lastErr := t.LastError() lastErr := t.LastError()
if lastErr != nil { if lastErr != nil {
@ -473,12 +474,11 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError) {
} }
} }
for i, t := range droppedTargets { for i, t := range tDropped {
res.DroppedTargets[i] = &DroppedTarget{ res.DroppedTargets[i] = &DroppedTarget{
DiscoveredLabels: t.DiscoveredLabels().Map(), DiscoveredLabels: t.DiscoveredLabels().Map(),
} }
} }
return res, nil return res, nil
} }

View file

@ -44,7 +44,7 @@ import (
type testTargetRetriever struct{} type testTargetRetriever struct{}
func (t testTargetRetriever) Targets() []*scrape.Target { func (t testTargetRetriever) TargetsActive() []*scrape.Target {
return []*scrape.Target{ return []*scrape.Target{
scrape.NewTarget( scrape.NewTarget(
labels.FromMap(map[string]string{ labels.FromMap(map[string]string{
@ -57,7 +57,7 @@ func (t testTargetRetriever) Targets() []*scrape.Target {
), ),
} }
} }
func (t testTargetRetriever) DroppedTargets() []*scrape.Target { func (t testTargetRetriever) TargetsDropped() []*scrape.Target {
return []*scrape.Target{ return []*scrape.Target{
scrape.NewTarget( scrape.NewTarget(
nil, nil,

View file

@ -404,7 +404,7 @@ func (h *Handler) Run(ctx context.Context) error {
h.options.QueryEngine, h.options.QueryEngine,
h.options.Storage.Querier, h.options.Storage.Querier,
func() []*scrape.Target { func() []*scrape.Target {
return h.options.ScrapeManager.Targets() return h.options.ScrapeManager.TargetsActive()
}, },
func() []*url.URL { func() []*url.URL {
return h.options.Notifier.Alertmanagers() return h.options.Notifier.Alertmanagers()
@ -592,7 +592,7 @@ func (h *Handler) rules(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) { func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
var index []string var index []string
targets := h.scrapeManager.TargetMap() targets := h.scrapeManager.TargetsAll()
for job := range targets { for job := range targets {
index = append(index, job) index = append(index, job)
} }
@ -610,7 +610,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
func (h *Handler) targets(w http.ResponseWriter, r *http.Request) { func (h *Handler) targets(w http.ResponseWriter, r *http.Request) {
// Bucket targets by job label // Bucket targets by job label
tps := map[string][]*scrape.Target{} tps := map[string][]*scrape.Target{}
for _, t := range h.scrapeManager.Targets() { for _, t := range h.scrapeManager.TargetsActive() {
job := t.Labels().Get(model.JobLabel) job := t.Labels().Get(model.JobLabel)
tps[job] = append(tps[job], t) tps[job] = append(tps[job], t)
} }