diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 507dab878..34326c13e 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/common/model" "golang.org/x/net/context" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" ) @@ -69,106 +70,169 @@ func init() { // scrapePool manages scrapes for sets of targets. type scrapePool struct { appender storage.SampleAppender + config *config.ScrapeConfig - ctx context.Context + ctx context.Context + + // Targets and loops must always be synchronized to have the same + // set of fingerprints. mtx sync.RWMutex - tgroups map[string]map[model.Fingerprint]*Target + targets map[model.Fingerprint]*Target + loops map[model.Fingerprint]loop - targets map[model.Fingerprint]loop + // Constructor for new scrape loops. This is settable for testing convenience. + newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop } -func newScrapePool(app storage.SampleAppender) *scrapePool { +func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { return &scrapePool{ appender: app, - tgroups: map[string]map[model.Fingerprint]*Target{}, + config: cfg, + targets: map[model.Fingerprint]*Target{}, + loops: map[model.Fingerprint]loop{}, + newLoop: newScrapeLoop, } } +// stop terminates all scrape loops and returns after they all terminated. func (sp *scrapePool) stop() { var wg sync.WaitGroup - sp.mtx.RLock() + sp.mtx.Lock() + defer sp.mtx.Unlock() - for _, tgroup := range sp.tgroups { - for _, t := range tgroup { - wg.Add(1) + for fp, l := range sp.loops { + wg.Add(1) - go func(t *Target) { - t.scrapeLoop.stop() - wg.Done() - }(t) - } + go func(l loop) { + l.stop() + wg.Done() + }(l) + + delete(sp.loops, fp) + delete(sp.targets, fp) } - sp.mtx.RUnlock() wg.Wait() } -func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { +// reload the scrape pool with the given scrape configuration. The target state is preserved +// but all scrape loops are restarted with the new scrape configuration. +// This method returns after all scrape loops that were stopped have fully terminated. +func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { sp.mtx.Lock() + defer sp.mtx.Unlock() + + sp.config = cfg var ( - wg sync.WaitGroup - newTgroups = map[string]map[model.Fingerprint]*Target{} + wg sync.WaitGroup + interval = time.Duration(sp.config.ScrapeInterval) + timeout = time.Duration(sp.config.ScrapeTimeout) ) - for source, targets := range tgroups { + for fp, oldLoop := range sp.loops { var ( - prevTargets = sp.tgroups[source] - newTargets = map[model.Fingerprint]*Target{} + t = sp.targets[fp] + newLoop = sp.newLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) ) - newTgroups[source] = newTargets + wg.Add(1) - for fp, tnew := range targets { - // If the same target existed before, we let it run and replace - // the new one with it. - if told, ok := prevTargets[fp]; ok { - newTargets[fp] = told - } else { - newTargets[fp] = tnew + go func(oldLoop, newLoop loop) { + oldLoop.stop() + wg.Done() - tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, tnew.wrapAppender(sp.appender), tnew.wrapReportingAppender(sp.appender)) - go tnew.scrapeLoop.run(tnew.interval(), tnew.timeout(), nil) - } - } - for fp, told := range prevTargets { - // A previous target is no longer in the group. - if _, ok := targets[fp]; !ok { - wg.Add(1) + go newLoop.run(interval, timeout, nil) + }(oldLoop, newLoop) - go func(told *Target) { - told.scrapeLoop.stop() - wg.Done() - }(told) - } + sp.loops[fp] = newLoop + } + + wg.Wait() +} + +// sync takes a list of potentially duplicated targets, deduplicates them, starts +// scrape loops for new targets, and stops scrape loops for disappeared targets. +// It returns after all stopped scrape loops terminated. +func (sp *scrapePool) sync(targets []*Target) { + sp.mtx.Lock() + defer sp.mtx.Unlock() + + var ( + fingerprints = map[model.Fingerprint]struct{}{} + interval = time.Duration(sp.config.ScrapeInterval) + timeout = time.Duration(sp.config.ScrapeTimeout) + ) + + for _, t := range targets { + fp := t.fingerprint() + fingerprints[fp] = struct{}{} + + if _, ok := sp.targets[fp]; !ok { + l := sp.newLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) + + sp.targets[fp] = t + sp.loops[fp] = l + + go l.run(interval, timeout, nil) } } - // Stop scrapers for target groups that disappeared completely. - for source, targets := range sp.tgroups { - if _, ok := tgroups[source]; ok { - continue - } - for _, told := range targets { + var wg sync.WaitGroup + + // Stop and remove old targets and scraper loops. + for fp := range sp.targets { + if _, ok := fingerprints[fp]; !ok { wg.Add(1) - - go func(told *Target) { - told.scrapeLoop.stop() + go func(l loop) { + l.stop() wg.Done() - }(told) + }(sp.loops[fp]) + + delete(sp.loops, fp) + delete(sp.targets, fp) } } - sp.tgroups = newTgroups - // Wait for all potentially stopped scrapers to terminate. // This covers the case of flapping targets. If the server is under high load, a new scraper // may be active and tries to insert. The old scraper that didn't terminate yet could still // be inserting a previous sample set. wg.Wait() +} - // TODO(fabxc): maybe this can be released earlier with subsequent refactoring. - sp.mtx.Unlock() +// sampleAppender returns an appender for ingested samples from the target. +func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { + app := sp.appender + // The relabelAppender has to be inside the label-modifying appenders + // so the relabeling rules are applied to the correct label set. + if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { + app = relabelAppender{ + SampleAppender: app, + relabelings: mrc, + } + } + + if sp.config.HonorLabels { + app = honorLabelsAppender{ + SampleAppender: app, + labels: target.Labels(), + } + } else { + app = ruleLabelsAppender{ + SampleAppender: app, + labels: target.Labels(), + } + } + return app +} + +// reportAppender returns an appender for reporting samples for the target. +func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { + return ruleLabelsAppender{ + SampleAppender: sp.appender, + labels: target.Labels(), + } } // A scraper retrieves samples and accepts a status report at the end. @@ -178,6 +242,7 @@ type scraper interface { offset(interval time.Duration) time.Duration } +// A loop can run and be stopped again. It must not be reused after it was stopped. type loop interface { run(interval, timeout time.Duration, errc chan<- error) stop() @@ -190,12 +255,11 @@ type scrapeLoop struct { reportAppender storage.SampleAppender done chan struct{} - mtx sync.RWMutex ctx context.Context cancel func() } -func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) *scrapeLoop { +func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop { sl := &scrapeLoop{ scraper: sc, appender: app, @@ -264,10 +328,7 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { } func (sl *scrapeLoop) stop() { - sl.mtx.RLock() sl.cancel() - sl.mtx.RUnlock() - <-sl.done } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 142bc13af..f40bf9b9a 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -14,15 +14,333 @@ package retrieval import ( + "fmt" + "reflect" + "sync" "testing" "time" "github.com/prometheus/common/model" "golang.org/x/net/context" - // "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" ) +func TestNewScrapePool(t *testing.T) { + var ( + app = &nopAppender{} + cfg = &config.ScrapeConfig{} + sp = newScrapePool(cfg, app) + ) + + if a, ok := sp.appender.(*nopAppender); !ok || a != app { + t.Fatalf("Wrong sample appender") + } + if sp.config != cfg { + t.Fatalf("Wrong scrape config") + } + if sp.newLoop == nil { + t.Fatalf("newLoop function not initialized") + } +} + +type testLoop struct { + startFunc func(interval, timeout time.Duration, errc chan<- error) + stopFunc func() +} + +func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) { + l.startFunc(interval, timeout, errc) +} + +func (l *testLoop) stop() { + l.stopFunc() +} + +func TestScrapePoolStop(t *testing.T) { + sp := &scrapePool{ + targets: map[model.Fingerprint]*Target{}, + loops: map[model.Fingerprint]loop{}, + } + var mtx sync.Mutex + stopped := map[model.Fingerprint]bool{} + numTargets := 20 + + // Stopping the scrape pool must call stop() on all scrape loops, + // clean them and the respective targets up. It must wait until each loop's + // stop function returned before returning itself. + + for i := 0; i < numTargets; i++ { + t := &Target{ + labels: model.LabelSet{ + model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)), + }, + } + l := &testLoop{} + l.stopFunc = func() { + time.Sleep(time.Duration(i*20) * time.Millisecond) + + mtx.Lock() + stopped[t.fingerprint()] = true + mtx.Unlock() + } + + sp.targets[t.fingerprint()] = t + sp.loops[t.fingerprint()] = l + } + + done := make(chan struct{}) + stopTime := time.Now() + + go func() { + sp.stop() + close(done) + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("scrapeLoop.stop() did not return as expected") + case <-done: + // This should have taken at least as long as the last target slept. + if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond { + t.Fatalf("scrapeLoop.stop() exited before all targets stopped") + } + } + + mtx.Lock() + if len(stopped) != numTargets { + t.Fatalf("Expected 20 stopped loops, got %d", len(stopped)) + } + mtx.Unlock() + + if len(sp.targets) > 0 { + t.Fatalf("Targets were not cleared on stopping: %d left", len(sp.targets)) + } + if len(sp.loops) > 0 { + t.Fatalf("Loops were not cleared on stopping: %d left", len(sp.loops)) + } +} + +func TestScrapePoolReload(t *testing.T) { + var mtx sync.Mutex + numTargets := 20 + + stopped := map[model.Fingerprint]bool{} + + reloadCfg := &config.ScrapeConfig{ + ScrapeInterval: model.Duration(3 * time.Second), + ScrapeTimeout: model.Duration(2 * time.Second), + } + // On starting to run, new loops created on reload check whether their preceeding + // equivalents have been stopped. + newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop { + l := &testLoop{} + l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { + if interval != 3*time.Second { + t.Errorf("Expected scrape interval %d but got %d", 3*time.Second, interval) + } + if timeout != 2*time.Second { + t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout) + } + mtx.Lock() + if !stopped[s.(*Target).fingerprint()] { + t.Errorf("Scrape loop for %v not stopped yet", s.(*Target)) + } + mtx.Unlock() + } + return l + } + sp := &scrapePool{ + targets: map[model.Fingerprint]*Target{}, + loops: map[model.Fingerprint]loop{}, + newLoop: newLoop, + } + + // Reloading a scrape pool with a new scrape configuration must stop all scrape + // loops and start new ones. A new loop must not be started before the preceeding + // one terminated. + + for i := 0; i < numTargets; i++ { + t := &Target{ + labels: model.LabelSet{ + model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)), + }, + } + l := &testLoop{} + l.stopFunc = func() { + time.Sleep(time.Duration(i*20) * time.Millisecond) + + mtx.Lock() + stopped[t.fingerprint()] = true + mtx.Unlock() + } + + sp.targets[t.fingerprint()] = t + sp.loops[t.fingerprint()] = l + } + done := make(chan struct{}) + + beforeTargets := map[model.Fingerprint]*Target{} + for fp, t := range sp.targets { + beforeTargets[fp] = t + } + + reloadTime := time.Now() + + go func() { + sp.reload(reloadCfg) + close(done) + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("scrapeLoop.reload() did not return as expected") + case <-done: + // This should have taken at least as long as the last target slept. + if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond { + t.Fatalf("scrapeLoop.stop() exited before all targets stopped") + } + } + + mtx.Lock() + if len(stopped) != numTargets { + t.Fatalf("Expected 20 stopped loops, got %d", stopped) + } + mtx.Unlock() + + if !reflect.DeepEqual(sp.targets, beforeTargets) { + t.Fatalf("Reloading affected target states unexpectedly") + } + if len(sp.loops) != numTargets { + t.Fatalf("Expected %d loops after reload but got %d", numTargets, len(sp.loops)) + } +} + +func TestScrapePoolReportAppender(t *testing.T) { + cfg := &config.ScrapeConfig{ + MetricRelabelConfigs: []*config.RelabelConfig{ + {}, {}, {}, + }, + } + target := newTestTarget("example.com:80", 10*time.Millisecond, nil) + app := &nopAppender{} + + sp := newScrapePool(cfg, app) + + cfg.HonorLabels = false + wrapped := sp.reportAppender(target) + + rl, ok := wrapped.(ruleLabelsAppender) + if !ok { + t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + } + if rl.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", rl.SampleAppender) + } + + cfg.HonorLabels = true + wrapped = sp.reportAppender(target) + + hl, ok := wrapped.(ruleLabelsAppender) + if !ok { + t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + } + if hl.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", hl.SampleAppender) + } +} + +func TestScrapePoolSampleAppender(t *testing.T) { + cfg := &config.ScrapeConfig{ + MetricRelabelConfigs: []*config.RelabelConfig{ + {}, {}, {}, + }, + } + + target := newTestTarget("example.com:80", 10*time.Millisecond, nil) + app := &nopAppender{} + + sp := newScrapePool(cfg, app) + + cfg.HonorLabels = false + wrapped := sp.sampleAppender(target) + + rl, ok := wrapped.(ruleLabelsAppender) + if !ok { + t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + } + re, ok := rl.SampleAppender.(relabelAppender) + if !ok { + t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) + } + if re.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", re.SampleAppender) + } + + cfg.HonorLabels = true + wrapped = sp.sampleAppender(target) + + hl, ok := wrapped.(honorLabelsAppender) + if !ok { + t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) + } + re, ok = hl.SampleAppender.(relabelAppender) + if !ok { + t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) + } + if re.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", re.SampleAppender) + } +} + +func TestScrapeLoopStop(t *testing.T) { + scraper := &testScraper{} + sl := newScrapeLoop(context.Background(), scraper, nil, nil) + + // The scrape pool synchronizes on stopping scrape loops. However, new scrape + // loops are syarted asynchronously. Thus it's possible, that a loop is stopped + // again before having started properly. + // Stopping not-yet-started loops must block until the run method was called and exited. + // The run method must exit immediately. + + stopDone := make(chan struct{}) + go func() { + sl.stop() + close(stopDone) + }() + + select { + case <-stopDone: + t.Fatalf("Stopping terminated before run exited successfully") + case <-time.After(500 * time.Millisecond): + } + + // Running the scrape loop must exit before calling the scraper even once. + scraper.scrapeFunc = func(context.Context, time.Time) (model.Samples, error) { + t.Fatalf("scraper was called for terminated scrape loop") + return nil, nil + } + + runDone := make(chan struct{}) + go func() { + sl.run(0, 0, nil) + close(runDone) + }() + + select { + case <-runDone: + case <-time.After(1 * time.Second): + t.Fatalf("Running terminated scrape loop did not exit") + } + + select { + case <-stopDone: + case <-time.After(1 * time.Second): + t.Fatalf("Stopping did not terminate after running exited") + } +} + func TestScrapeLoopRun(t *testing.T) { var ( signal = make(chan struct{}) diff --git a/retrieval/target.go b/retrieval/target.go index d341d98df..81d3e1bd8 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -121,13 +121,12 @@ type Target struct { // The status object for the target. It is only set once on initialization. status *TargetStatus - scrapeLoop *scrapeLoop + scrapeLoop *scrapeLoop + scrapeConfig *config.ScrapeConfig // Mutex protects the members below. sync.RWMutex - scrapeConfig *config.ScrapeConfig - // Labels before any processing. metaLabels model.LabelSet // Any labels that are added to this target and its metrics. @@ -230,20 +229,6 @@ func (t *Target) offset(interval time.Duration) time.Duration { return time.Duration(next) } -func (t *Target) interval() time.Duration { - t.RLock() - defer t.RUnlock() - - return time.Duration(t.scrapeConfig.ScrapeInterval) -} - -func (t *Target) timeout() time.Duration { - t.RLock() - defer t.RUnlock() - - return time.Duration(t.scrapeConfig.ScrapeTimeout) -} - func (t *Target) scheme() string { t.RLock() defer t.RUnlock() @@ -265,42 +250,6 @@ func (t *Target) path() string { return string(t.labels[model.MetricsPathLabel]) } -// wrapAppender wraps a SampleAppender for samples ingested from the target. -// RLock must be acquired by the caller. -func (t *Target) wrapAppender(app storage.SampleAppender) storage.SampleAppender { - // The relabelAppender has to be inside the label-modifying appenders - // so the relabeling rules are applied to the correct label set. - if mrc := t.scrapeConfig.MetricRelabelConfigs; len(mrc) > 0 { - app = relabelAppender{ - SampleAppender: app, - relabelings: mrc, - } - } - - if t.scrapeConfig.HonorLabels { - app = honorLabelsAppender{ - SampleAppender: app, - labels: t.unlockedLabels(), - } - } else { - app = ruleLabelsAppender{ - SampleAppender: app, - labels: t.unlockedLabels(), - } - } - return app -} - -// wrapReportingAppender wraps an appender for target status report samples. -// It ignores any relabeling rules set for the target. -// RLock must not be acquired by the caller. -func (t *Target) wrapReportingAppender(app storage.SampleAppender) storage.SampleAppender { - return ruleLabelsAppender{ - SampleAppender: app, - labels: t.Labels(), - } -} - // URL returns a copy of the target's URL. func (t *Target) URL() *url.URL { t.RLock() diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 54a89b229..a6c77d232 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -92,82 +92,6 @@ func TestTargetOffset(t *testing.T) { } } -func TestTargetWrapReportingAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - target.scrapeConfig = cfg - app := &nopAppender{} - - cfg.HonorLabels = false - wrapped := target.wrapReportingAppender(app) - - rl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if rl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", rl.SampleAppender) - } - - cfg.HonorLabels = true - wrapped = target.wrapReportingAppender(app) - - hl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if hl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", hl.SampleAppender) - } -} - -func TestTargetWrapAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - target.scrapeConfig = cfg - app := &nopAppender{} - - cfg.HonorLabels = false - wrapped := target.wrapAppender(app) - - rl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - re, ok := rl.SampleAppender.(relabelAppender) - if !ok { - t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) - } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) - } - - cfg.HonorLabels = true - wrapped = target.wrapAppender(app) - - hl, ok := wrapped.(honorLabelsAppender) - if !ok { - t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) - } - re, ok = hl.SampleAppender.(relabelAppender) - if !ok { - t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) - } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) - } -} - func TestTargetScrape404(t *testing.T) { server := httptest.NewServer( http.HandlerFunc( diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index bc750f4b2..d0f0960a1 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -117,6 +117,8 @@ func (tm *TargetManager) reload() { ts.runScraping(tm.ctx) tm.wg.Done() }(ts) + } else { + ts.reload(scfg) } ts.runProviders(tm.ctx, providersFromConfig(scfg)) } @@ -140,12 +142,14 @@ func (tm *TargetManager) Pools() map[string][]*Target { // TODO(fabxc): this is just a hack to maintain compatibility for now. for _, ps := range tm.targetSets { - for _, ts := range ps.scrapePool.tgroups { - for _, t := range ts { - job := string(t.Labels()[model.JobLabel]) - pools[job] = append(pools[job], t) - } + ps.scrapePool.mtx.RLock() + + for _, t := range ps.scrapePool.targets { + job := string(t.Labels()[model.JobLabel]) + pools[job] = append(pools[job], t) } + + ps.scrapePool.mtx.RUnlock() } return pools } @@ -166,10 +170,12 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { } // targetSet holds several TargetProviders for which the same scrape configuration -// is used. It runs the target providers and starts and stops scrapers as it -// receives target updates. +// is used. It maintains target groups from all given providers and sync them +// to a scrape pool. type targetSet struct { - mtx sync.RWMutex + mtx sync.RWMutex + + // Sets of targets by a source string that is unique across target providers. tgroups map[string]map[model.Fingerprint]*Target providers map[string]TargetProvider @@ -184,7 +190,7 @@ type targetSet struct { func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { ts := &targetSet{ tgroups: map[string]map[model.Fingerprint]*Target{}, - scrapePool: newScrapePool(app), + scrapePool: newScrapePool(cfg, app), syncCh: make(chan struct{}, 1), config: cfg, } @@ -203,6 +209,14 @@ func (ts *targetSet) cancel() { } } +func (ts *targetSet) reload(cfg *config.ScrapeConfig) { + ts.mtx.Lock() + ts.config = cfg + ts.mtx.Unlock() + + ts.scrapePool.reload(cfg) +} + func (ts *targetSet) runScraping(ctx context.Context) { ctx, ts.cancelScraping = context.WithCancel(ctx) @@ -221,7 +235,9 @@ Loop: case <-ctx.Done(): break Loop case <-ts.syncCh: + ts.mtx.RLock() ts.sync() + ts.mtx.RUnlock() } } @@ -231,9 +247,13 @@ Loop: } func (ts *targetSet) sync() { - // TODO(fabxc): temporary simple version. For a deduplicating scrape pool we will - // submit a list of all targets. - ts.scrapePool.sync(ts.tgroups) + targets := []*Target{} + for _, tgroup := range ts.tgroups { + for _, t := range tgroup { + targets = append(targets, t) + } + } + ts.scrapePool.sync(targets) } func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) { @@ -298,8 +318,9 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ go prov.Run(ctx, updates) } + // We wait for a full initial set of target groups before releasing the mutex + // to ensure the initial sync is complete and there are no races with subsequent updates. wg.Wait() - ts.sync() }