Protect sp.loops from concurrent access. ()

Manager.reload takes the mutex that would make it safe, however
releases it before the goroutines spawned are finished with it.
Thus more explicit locking of scrapePool.Sync/stop/reload is needed.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2020-11-12 16:06:25 +00:00 committed by Frederic Branczyk
parent 00f16d1ac3
commit 983a8eae4f
No known key found for this signature in database
GPG key ID: 576DA6AF8CB9027F

View file

@ -191,23 +191,20 @@ func init() {
type scrapePool struct {
appendable storage.Appendable
logger log.Logger
cancel context.CancelFunc
// mtx must not be taken after targetMtx.
mtx sync.Mutex
config *config.ScrapeConfig
client *http.Client
loops map[uint64]loop
targetLimitHit bool // Internal state to speed up the target_limit checks.
// targetMtx protects activeTargets and droppedTargets from concurrent reads
// and writes. Only one of Sync/stop/reload may be called at once due to
// manager.mtxScrape so we only need to protect from concurrent reads from
// the ActiveTargets and DroppedTargets methods. This allows those two
// methods to always complete without having to wait on scrape loops to gracefull stop.
targetMtx sync.Mutex
config *config.ScrapeConfig
client *http.Client
// Targets and loops must always be synchronized to have the same
// activeTargets and loops must always be synchronized to have the same
// set of hashes.
activeTargets map[uint64]*Target
droppedTargets []*Target
loops map[uint64]loop
cancel context.CancelFunc
targetLimitHit bool // Internal state to speed up the target_limit checks.
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(scrapeLoopOptions) loop
@ -297,6 +294,8 @@ func (sp *scrapePool) DroppedTargets() []*Target {
// stop terminates all scrape loops and returns after they all terminated.
func (sp *scrapePool) stop() {
sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.cancel()
var wg sync.WaitGroup
@ -331,6 +330,8 @@ func (sp *scrapePool) stop() {
// but all scrape loops are restarted with the new scrape configuration.
// This method returns after all scrape loops that were stopped have stopped scraping.
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
sp.mtx.Lock()
defer sp.mtx.Unlock()
targetScrapePoolReloads.Inc()
start := time.Now()
@ -407,6 +408,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
start := time.Now()
sp.targetMtx.Lock()