Merge pull request #3698 from krasi-georgiev/scrape-config-reload-fix

Scraper config reload bugfix
This commit is contained in:
Goutham Veeramachaneni 2018-01-18 19:00:28 +05:30 committed by GitHub
commit ab7c54e2c8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 120 additions and 90 deletions

View file

@ -28,6 +28,7 @@ import (
"path/filepath" "path/filepath"
"runtime" "runtime"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
@ -286,7 +287,10 @@ func main() {
reloaders := []func(cfg *config.Config) error{ reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig, remoteStorage.ApplyConfig,
webHandler.ApplyConfig, webHandler.ApplyConfig,
// The Scrape and notifier managers need to reload before the Discovery manager as
// they need to read the most updated config when receiving the new targets list.
notifier.ApplyConfig, notifier.ApplyConfig,
scrapeManager.ApplyConfig,
func(cfg *config.Config) error { func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig) c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs { for _, v := range cfg.ScrapeConfigs {
@ -306,7 +310,6 @@ func main() {
} }
return discoveryManagerNotify.ApplyConfig(c) return discoveryManagerNotify.ApplyConfig(c)
}, },
scrapeManager.ApplyConfig,
func(cfg *config.Config) error { func(cfg *config.Config) error {
// Get all rule files matching the configuration oaths. // Get all rule files matching the configuration oaths.
var files []string var files []string
@ -328,8 +331,22 @@ func main() {
// Start all components while we wait for TSDB to open but only load // Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed. // initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{}) dbOpen := make(chan struct{})
// Wait until the server is ready to handle reloading
reloadReady := make(chan struct{}) // sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}
var g group.Group var g group.Group
{ {
@ -338,12 +355,16 @@ func main() {
cancel := make(chan struct{}) cancel := make(chan struct{})
g.Add( g.Add(
func() error { func() error {
// Don't forget to release the reloadReady channel so that waiting blocks can exit normally.
select { select {
case <-term: case <-term:
level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
reloadReady.Close()
case <-webHandler.Quit(): case <-webHandler.Quit():
level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...") level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...")
case <-cancel: case <-cancel:
reloadReady.Close()
break break
} }
return nil return nil
@ -384,6 +405,15 @@ func main() {
{ {
g.Add( g.Add(
func() error { func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
select {
case <-reloadReady.C:
break
}
err := scrapeManager.Run(discoveryManagerScrape.SyncCh()) err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped") level.Info(logger).Log("msg", "Scrape manager stopped")
return err return err
@ -405,11 +435,8 @@ func main() {
g.Add( g.Add(
func() error { func() error {
select { select {
case <-reloadReady: case <-reloadReady.C:
break break
// In case a shutdown is initiated before the reloadReady is released.
case <-cancel:
return nil
} }
for { for {
@ -445,6 +472,7 @@ func main() {
break break
// In case a shutdown is initiated before the dbOpen is released // In case a shutdown is initiated before the dbOpen is released
case <-cancel: case <-cancel:
reloadReady.Close()
return nil return nil
} }
@ -452,9 +480,10 @@ func main() {
return fmt.Errorf("Error loading config %s", err) return fmt.Errorf("Error loading config %s", err)
} }
close(reloadReady) reloadReady.Close()
webHandler.Ready() webHandler.Ready()
level.Info(logger).Log("msg", "Server is ready to receive requests.") level.Info(logger).Log("msg", "Server is ready to receive web requests.")
<-cancel <-cancel
return nil return nil
}, },
@ -529,7 +558,16 @@ func main() {
// so keep this interrupt after the ruleManager.Stop(). // so keep this interrupt after the ruleManager.Stop().
g.Add( g.Add(
func() error { func() error {
// When the notifier manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager
// so we wait until the config is fully loaded.
select {
case <-reloadReady.C:
break
}
notifier.Run(discoveryManagerNotify.SyncCh()) notifier.Run(discoveryManagerNotify.SyncCh())
level.Info(logger).Log("msg", "Notifier manager stopped")
return nil return nil
}, },
func(err error) { func(err error) {

View file

@ -16,6 +16,7 @@ package discovery
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
@ -61,10 +62,10 @@ type poolKey struct {
func NewManager(logger log.Logger) *Manager { func NewManager(logger log.Logger) *Manager {
return &Manager{ return &Manager{
logger: logger, logger: logger,
actionCh: make(chan func(context.Context)),
syncCh: make(chan map[string][]*targetgroup.Group), syncCh: make(chan map[string][]*targetgroup.Group),
targets: make(map[poolKey]map[string]*targetgroup.Group), targets: make(map[poolKey]map[string]*targetgroup.Group),
discoverCancel: []context.CancelFunc{}, discoverCancel: []context.CancelFunc{},
ctx: context.Background(),
} }
} }
@ -72,7 +73,8 @@ func NewManager(logger log.Logger) *Manager {
// Targets are grouped by the target set name. // Targets are grouped by the target set name.
type Manager struct { type Manager struct {
logger log.Logger logger log.Logger
actionCh chan func(context.Context) mtx sync.RWMutex
ctx context.Context
discoverCancel []context.CancelFunc discoverCancel []context.CancelFunc
// Some Discoverers(eg. k8s) send only the updates for a given target group // Some Discoverers(eg. k8s) send only the updates for a given target group
// so we use map[tg.Source]*targetgroup.Group to know which group to update. // so we use map[tg.Source]*targetgroup.Group to know which group to update.
@ -83,10 +85,9 @@ type Manager struct {
// Run starts the background processing // Run starts the background processing
func (m *Manager) Run(ctx context.Context) error { func (m *Manager) Run(ctx context.Context) error {
m.ctx = ctx
for { for {
select { select {
case f := <-m.actionCh:
f(ctx)
case <-ctx.Done(): case <-ctx.Done():
m.cancelDiscoverers() m.cancelDiscoverers()
return ctx.Err() return ctx.Err()
@ -101,18 +102,17 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
// ApplyConfig removes all running discovery providers and starts new ones using the provided config. // ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error { func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
err := make(chan error) m.mtx.Lock()
m.actionCh <- func(ctx context.Context) { defer m.mtx.Unlock()
m.cancelDiscoverers()
for name, scfg := range cfg { m.cancelDiscoverers()
for provName, prov := range m.providersFromConfig(scfg) { for name, scfg := range cfg {
m.startProvider(ctx, poolKey{setName: name, provider: provName}, prov) for provName, prov := range m.providersFromConfig(scfg) {
} m.startProvider(m.ctx, poolKey{setName: name, provider: provName}, prov)
} }
close(err)
} }
return <-err return nil
} }
func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) { func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) {
@ -151,39 +151,32 @@ func (m *Manager) cancelDiscoverers() {
} }
func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
done := make(chan struct{}) m.mtx.Lock()
defer m.mtx.Unlock()
m.actionCh <- func(ctx context.Context) { for _, tg := range tgs {
for _, tg := range tgs { if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. if _, ok := m.targets[poolKey]; !ok {
if _, ok := m.targets[poolKey]; !ok { m.targets[poolKey] = make(map[string]*targetgroup.Group)
m.targets[poolKey] = make(map[string]*targetgroup.Group)
}
m.targets[poolKey][tg.Source] = tg
} }
m.targets[poolKey][tg.Source] = tg
} }
close(done)
} }
<-done
} }
func (m *Manager) allGroups() map[string][]*targetgroup.Group { func (m *Manager) allGroups() map[string][]*targetgroup.Group {
tSets := make(chan map[string][]*targetgroup.Group) m.mtx.Lock()
defer m.mtx.Unlock()
m.actionCh <- func(ctx context.Context) { tSets := map[string][]*targetgroup.Group{}
tSetsAll := map[string][]*targetgroup.Group{} for pkey, tsets := range m.targets {
for pkey, tsets := range m.targets { for _, tg := range tsets {
for _, tg := range tsets { // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' // to signal that it needs to stop all scrape loops for this target set.
// to signal that it needs to stop all scrape loops for this target set. tSets[pkey.setName] = append(tSets[pkey.setName], tg)
tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg)
}
} }
tSets <- tSetsAll
} }
return <-tSets return tSets
} }
func (m *Manager) providersFromConfig(cfg sd_config.ServiceDiscoveryConfig) map[string]Discoverer { func (m *Manager) providersFromConfig(cfg sd_config.ServiceDiscoveryConfig) map[string]Discoverer {

View file

@ -490,7 +490,7 @@ func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []
// Stop shuts down the notification handler. // Stop shuts down the notification handler.
func (n *Notifier) Stop() { func (n *Notifier) Stop() {
level.Info(n.logger).Log("msg", "Stopping notification handler...") level.Info(n.logger).Log("msg", "Stopping notification manager...")
n.cancel() n.cancel()
} }

View file

@ -15,6 +15,8 @@ package retrieval
import ( import (
"fmt" "fmt"
"reflect"
"sync"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
@ -35,7 +37,6 @@ func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager {
return &ScrapeManager{ return &ScrapeManager{
append: app, append: app,
logger: logger, logger: logger,
actionCh: make(chan func()),
scrapeConfigs: make(map[string]*config.ScrapeConfig), scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool), scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}), graceShut: make(chan struct{}),
@ -49,7 +50,7 @@ type ScrapeManager struct {
append Appendable append Appendable
scrapeConfigs map[string]*config.ScrapeConfig scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool scrapePools map[string]*scrapePool
actionCh chan func() mtx sync.RWMutex
graceShut chan struct{} graceShut chan struct{}
} }
@ -59,8 +60,6 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*targetgroup.Group) error
for { for {
select { select {
case f := <-m.actionCh:
f()
case ts := <-tsets: case ts := <-tsets:
m.reload(ts) m.reload(ts)
case <-m.graceShut: case <-m.graceShut:
@ -79,52 +78,60 @@ func (m *ScrapeManager) Stop() {
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. // ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error {
done := make(chan struct{}) m.mtx.Lock()
m.actionCh <- func() { defer m.mtx.Unlock()
c := make(map[string]*config.ScrapeConfig) c := make(map[string]*config.ScrapeConfig)
for _, scfg := range cfg.ScrapeConfigs { for _, scfg := range cfg.ScrapeConfigs {
c[scfg.JobName] = scfg c[scfg.JobName] = scfg
}
m.scrapeConfigs = c
close(done)
} }
<-done m.scrapeConfigs = c
// Cleanup and reload pool if config has changed.
for name, sp := range m.scrapePools {
if cfg, ok := m.scrapeConfigs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
} else if !reflect.DeepEqual(sp.config, cfg) {
sp.reload(cfg)
}
}
return nil return nil
} }
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. // TargetMap returns map of active and dropped targets and their corresponding scrape config job name.
func (m *ScrapeManager) TargetMap() map[string][]*Target { func (m *ScrapeManager) TargetMap() map[string][]*Target {
targetsMap := make(chan map[string][]*Target) m.mtx.Lock()
m.actionCh <- func() { defer m.mtx.Unlock()
targets := make(map[string][]*Target)
for jobName, sp := range m.scrapePools { targets := make(map[string][]*Target)
sp.mtx.RLock() for jobName, sp := range m.scrapePools {
for _, t := range sp.targets { sp.mtx.RLock()
targets[jobName] = append(targets[jobName], t) for _, t := range sp.targets {
} targets[jobName] = append(targets[jobName], t)
targets[jobName] = append(targets[jobName], sp.droppedTargets...)
sp.mtx.RUnlock()
} }
targetsMap <- targets targets[jobName] = append(targets[jobName], sp.droppedTargets...)
sp.mtx.RUnlock()
} }
return <-targetsMap
return targets
} }
// Targets returns the targets currently being scraped. // Targets returns the targets currently being scraped.
func (m *ScrapeManager) Targets() []*Target { func (m *ScrapeManager) Targets() []*Target {
targets := make(chan []*Target) m.mtx.Lock()
m.actionCh <- func() { defer m.mtx.Unlock()
var t []*Target
for _, p := range m.scrapePools { var targets []*Target
p.mtx.RLock() for _, p := range m.scrapePools {
for _, tt := range p.targets { p.mtx.RLock()
t = append(t, tt) for _, tt := range p.targets {
} targets = append(targets, tt)
p.mtx.RUnlock()
} }
targets <- t p.mtx.RUnlock()
} }
return <-targets
return targets
} }
func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) { func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) {
@ -146,12 +153,4 @@ func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) {
existing.Sync(tgroup) existing.Sync(tgroup)
} }
} }
// Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config.
for name, sp := range m.scrapePools {
if _, ok := m.scrapeConfigs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
}
}
} }