mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Implement config reloading on SIGHUP.
With this commit, sending SIGHUP to the Prometheus process will reload and apply the configuration file. The different components attempt to handle failing changes gracefully.
This commit is contained in:
parent
3b0777ff84
commit
bb540fd9fd
53
main.go
53
main.go
|
@ -92,13 +92,6 @@ type prometheus struct {
|
||||||
// NewPrometheus creates a new prometheus object based on flag values.
|
// NewPrometheus creates a new prometheus object based on flag values.
|
||||||
// Call Serve() to start serving and Close() for clean shutdown.
|
// Call Serve() to start serving and Close() for clean shutdown.
|
||||||
func NewPrometheus() *prometheus {
|
func NewPrometheus() *prometheus {
|
||||||
conf, err := config.LoadFromFile(*configFile)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Couldn't load configuration (-config.file=%s): %v", *configFile, err)
|
|
||||||
glog.Errorf("Note: The configuration format has changed with version 0.14, please check the documentation.")
|
|
||||||
os.Exit(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity)
|
notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity)
|
||||||
|
|
||||||
var syncStrategy local.SyncStrategy
|
var syncStrategy local.SyncStrategy
|
||||||
|
@ -155,26 +148,17 @@ func NewPrometheus() *prometheus {
|
||||||
sampleAppender = fanout
|
sampleAppender = fanout
|
||||||
}
|
}
|
||||||
|
|
||||||
targetManager, err := retrieval.NewTargetManager(conf, sampleAppender)
|
targetManager := retrieval.NewTargetManager(sampleAppender)
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error creating target manager: %s", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
queryEngine := promql.NewEngine(memStorage)
|
queryEngine := promql.NewEngine(memStorage)
|
||||||
|
|
||||||
ruleManager := rules.NewManager(&rules.ManagerOptions{
|
ruleManager := rules.NewManager(&rules.ManagerOptions{
|
||||||
SampleAppender: sampleAppender,
|
SampleAppender: sampleAppender,
|
||||||
NotificationHandler: notificationHandler,
|
NotificationHandler: notificationHandler,
|
||||||
EvaluationInterval: time.Duration(conf.GlobalConfig.EvaluationInterval),
|
|
||||||
QueryEngine: queryEngine,
|
QueryEngine: queryEngine,
|
||||||
PrometheusURL: web.MustBuildServerURL(*pathPrefix),
|
PrometheusURL: web.MustBuildServerURL(*pathPrefix),
|
||||||
PathPrefix: *pathPrefix,
|
PathPrefix: *pathPrefix,
|
||||||
})
|
})
|
||||||
if err := ruleManager.LoadRuleFiles(conf.RuleFiles...); err != nil {
|
|
||||||
glog.Errorf("Error loading rule files: %s", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
flags := map[string]string{}
|
flags := map[string]string{}
|
||||||
flag.VisitAll(func(f *flag.Flag) {
|
flag.VisitAll(func(f *flag.Flag) {
|
||||||
|
@ -182,7 +166,6 @@ func NewPrometheus() *prometheus {
|
||||||
})
|
})
|
||||||
prometheusStatus := &web.PrometheusStatusHandler{
|
prometheusStatus := &web.PrometheusStatusHandler{
|
||||||
BuildInfo: BuildInfo,
|
BuildInfo: BuildInfo,
|
||||||
Config: conf.String(),
|
|
||||||
RuleManager: ruleManager,
|
RuleManager: ruleManager,
|
||||||
TargetPools: targetManager.Pools,
|
TargetPools: targetManager.Pools,
|
||||||
Flags: flags,
|
Flags: flags,
|
||||||
|
@ -229,9 +212,27 @@ func NewPrometheus() *prometheus {
|
||||||
webService: webService,
|
webService: webService,
|
||||||
}
|
}
|
||||||
webService.QuitChan = make(chan struct{})
|
webService.QuitChan = make(chan struct{})
|
||||||
|
|
||||||
|
p.reloadConfig()
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *prometheus) reloadConfig() {
|
||||||
|
glog.Infof("Loading configuration file %s", *configFile)
|
||||||
|
|
||||||
|
conf, err := config.LoadFromFile(*configFile)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't load configuration (-config.file=%s): %v", *configFile, err)
|
||||||
|
glog.Errorf("Note: The configuration format has changed with version 0.14, please check the documentation.")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
p.webService.StatusHandler.ApplyConfig(conf)
|
||||||
|
p.targetManager.ApplyConfig(conf)
|
||||||
|
p.ruleManager.ApplyConfig(conf)
|
||||||
|
}
|
||||||
|
|
||||||
// Serve starts the Prometheus server. It returns after the server has been shut
|
// Serve starts the Prometheus server. It returns after the server has been shut
|
||||||
// down. The method installs an interrupt handler, allowing to trigger a
|
// down. The method installs an interrupt handler, allowing to trigger a
|
||||||
// shutdown by sending SIGTERM to the process.
|
// shutdown by sending SIGTERM to the process.
|
||||||
|
@ -252,15 +253,25 @@ func (p *prometheus) Serve() {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
notifier := make(chan os.Signal)
|
hup := make(chan os.Signal)
|
||||||
signal.Notify(notifier, os.Interrupt, syscall.SIGTERM)
|
signal.Notify(hup, syscall.SIGHUP)
|
||||||
|
go func() {
|
||||||
|
for range hup {
|
||||||
|
p.reloadConfig()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
term := make(chan os.Signal)
|
||||||
|
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
|
||||||
select {
|
select {
|
||||||
case <-notifier:
|
case <-term:
|
||||||
glog.Warning("Received SIGTERM, exiting gracefully...")
|
glog.Warning("Received SIGTERM, exiting gracefully...")
|
||||||
case <-p.webService.QuitChan:
|
case <-p.webService.QuitChan:
|
||||||
glog.Warning("Received termination request via web service, exiting gracefully...")
|
glog.Warning("Received termination request via web service, exiting gracefully...")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
close(hup)
|
||||||
|
|
||||||
p.targetManager.Stop()
|
p.targetManager.Stop()
|
||||||
p.ruleManager.Stop()
|
p.ruleManager.Stop()
|
||||||
p.queryEngine.Stop()
|
p.queryEngine.Stop()
|
||||||
|
|
|
@ -285,6 +285,7 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender) {
|
||||||
// On changed scrape interval the new interval becomes effective
|
// On changed scrape interval the new interval becomes effective
|
||||||
// after the next scrape.
|
// after the next scrape.
|
||||||
if lastScrapeInterval != t.scrapeInterval {
|
if lastScrapeInterval != t.scrapeInterval {
|
||||||
|
ticker.Stop()
|
||||||
ticker = time.NewTicker(t.scrapeInterval)
|
ticker = time.NewTicker(t.scrapeInterval)
|
||||||
lastScrapeInterval = t.scrapeInterval
|
lastScrapeInterval = t.scrapeInterval
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,16 +62,13 @@ type TargetManager struct {
|
||||||
providers map[*config.ScrapeConfig][]TargetProvider
|
providers map[*config.ScrapeConfig][]TargetProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTargetManager creates a new TargetManager based on the given config.
|
// NewTargetManager creates a new TargetManager.
|
||||||
func NewTargetManager(cfg *config.Config, sampleAppender storage.SampleAppender) (*TargetManager, error) {
|
func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager {
|
||||||
tm := &TargetManager{
|
tm := &TargetManager{
|
||||||
sampleAppender: sampleAppender,
|
sampleAppender: sampleAppender,
|
||||||
targets: make(map[string][]Target),
|
targets: make(map[string][]Target),
|
||||||
}
|
}
|
||||||
if err := tm.applyConfig(cfg); err != nil {
|
return tm
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return tm, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts background processing to handle target updates.
|
// Run starts background processing to handle target updates.
|
||||||
|
@ -129,19 +126,17 @@ func fullSource(cfg *config.ScrapeConfig, src string) string {
|
||||||
|
|
||||||
// Stop all background processing.
|
// Stop all background processing.
|
||||||
func (tm *TargetManager) Stop() {
|
func (tm *TargetManager) Stop() {
|
||||||
tm.stop(true)
|
tm.m.Lock()
|
||||||
|
defer tm.m.Unlock()
|
||||||
|
|
||||||
|
if tm.running {
|
||||||
|
tm.stop(true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop background processing of the target manager. If removeTargets is true,
|
// stop background processing of the target manager. If removeTargets is true,
|
||||||
// existing targets will be stopped and removed.
|
// existing targets will be stopped and removed.
|
||||||
func (tm *TargetManager) stop(removeTargets bool) {
|
func (tm *TargetManager) stop(removeTargets bool) {
|
||||||
tm.m.Lock()
|
|
||||||
defer tm.m.Unlock()
|
|
||||||
|
|
||||||
if !tm.running {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.Info("Stopping target manager...")
|
glog.Info("Stopping target manager...")
|
||||||
defer glog.Info("Target manager stopped.")
|
defer glog.Info("Target manager stopped.")
|
||||||
|
|
||||||
|
@ -273,35 +268,23 @@ func (tm *TargetManager) Pools() map[string][]Target {
|
||||||
|
|
||||||
// ApplyConfig resets the manager's target providers and job configurations as defined
|
// ApplyConfig resets the manager's target providers and job configurations as defined
|
||||||
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
|
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
|
||||||
func (tm *TargetManager) ApplyConfig(cfg *config.Config) error {
|
func (tm *TargetManager) ApplyConfig(cfg *config.Config) {
|
||||||
tm.stop(false)
|
|
||||||
// Even if updating the config failed, we want to continue rather than stop scraping anything.
|
|
||||||
defer tm.Run()
|
|
||||||
|
|
||||||
if err := tm.applyConfig(cfg); err != nil {
|
|
||||||
glog.Warningf("Error updating config, changes not applied: %s", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tm *TargetManager) applyConfig(cfg *config.Config) error {
|
|
||||||
// Only apply changes if everything was successful.
|
|
||||||
providers := map[*config.ScrapeConfig][]TargetProvider{}
|
|
||||||
|
|
||||||
for _, scfg := range cfg.ScrapeConfigs {
|
|
||||||
provs, err := ProvidersFromConfig(scfg)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
providers[scfg] = provs
|
|
||||||
}
|
|
||||||
tm.m.Lock()
|
tm.m.Lock()
|
||||||
defer tm.m.Unlock()
|
defer tm.m.Unlock()
|
||||||
|
|
||||||
|
if tm.running {
|
||||||
|
tm.stop(false)
|
||||||
|
// Even if updating the config failed, we want to continue rather than stop scraping anything.
|
||||||
|
defer tm.Run()
|
||||||
|
}
|
||||||
|
providers := map[*config.ScrapeConfig][]TargetProvider{}
|
||||||
|
|
||||||
|
for _, scfg := range cfg.ScrapeConfigs {
|
||||||
|
providers[scfg] = ProvidersFromConfig(scfg)
|
||||||
|
}
|
||||||
|
|
||||||
tm.globalLabels = cfg.GlobalConfig.Labels
|
tm.globalLabels = cfg.GlobalConfig.Labels
|
||||||
tm.providers = providers
|
tm.providers = providers
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// targetsFromGroup builds targets based on the given TargetGroup and config.
|
// targetsFromGroup builds targets based on the given TargetGroup and config.
|
||||||
|
@ -335,7 +318,7 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc
|
||||||
|
|
||||||
labels, err := Relabel(labels, cfg.RelabelConfigs...)
|
labels, err := Relabel(labels, cfg.RelabelConfigs...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error while relabelling instance %d in target group %s: %s", i, tg, err)
|
return nil, fmt.Errorf("error while relabeling instance %d in target group %s: %s", i, tg, err)
|
||||||
}
|
}
|
||||||
// Check if the target was dropped.
|
// Check if the target was dropped.
|
||||||
if labels == nil {
|
if labels == nil {
|
||||||
|
@ -357,7 +340,7 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProvidersFromConfig returns all TargetProviders configured in cfg.
|
// ProvidersFromConfig returns all TargetProviders configured in cfg.
|
||||||
func ProvidersFromConfig(cfg *config.ScrapeConfig) ([]TargetProvider, error) {
|
func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
|
||||||
var providers []TargetProvider
|
var providers []TargetProvider
|
||||||
|
|
||||||
for _, dnscfg := range cfg.DNSSDConfigs {
|
for _, dnscfg := range cfg.DNSSDConfigs {
|
||||||
|
@ -367,7 +350,7 @@ func ProvidersFromConfig(cfg *config.ScrapeConfig) ([]TargetProvider, error) {
|
||||||
if len(cfg.TargetGroups) > 0 {
|
if len(cfg.TargetGroups) > 0 {
|
||||||
providers = append(providers, NewStaticProvider(cfg.TargetGroups))
|
providers = append(providers, NewStaticProvider(cfg.TargetGroups))
|
||||||
}
|
}
|
||||||
return providers, nil
|
return providers
|
||||||
}
|
}
|
||||||
|
|
||||||
// StaticProvider holds a list of target groups that never change.
|
// StaticProvider holds a list of target groups that never change.
|
||||||
|
|
|
@ -277,19 +277,15 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
conf := &config.Config{DefaultedConfig: config.DefaultConfig}
|
conf := &config.Config{DefaultedConfig: config.DefaultConfig}
|
||||||
|
|
||||||
targetManager, err := NewTargetManager(conf, nopAppender{})
|
targetManager := NewTargetManager(nopAppender{})
|
||||||
if err != nil {
|
targetManager.ApplyConfig(conf)
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
targetManager.Run()
|
targetManager.Run()
|
||||||
defer targetManager.Stop()
|
defer targetManager.Stop()
|
||||||
|
|
||||||
for i, step := range sequence {
|
for i, step := range sequence {
|
||||||
conf.ScrapeConfigs = step.scrapeConfigs
|
conf.ScrapeConfigs = step.scrapeConfigs
|
||||||
err := targetManager.ApplyConfig(conf)
|
targetManager.ApplyConfig(conf)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
<-time.After(1 * time.Millisecond)
|
<-time.After(1 * time.Millisecond)
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
|
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
clientmodel "github.com/prometheus/client_golang/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/notification"
|
"github.com/prometheus/prometheus/notification"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
@ -120,7 +121,11 @@ func NewManager(o *ManagerOptions) *Manager {
|
||||||
func (m *Manager) Run() {
|
func (m *Manager) Run() {
|
||||||
defer glog.Info("Rule manager stopped.")
|
defer glog.Info("Rule manager stopped.")
|
||||||
|
|
||||||
ticker := time.NewTicker(m.interval)
|
m.Lock()
|
||||||
|
lastInterval := m.interval
|
||||||
|
m.Unlock()
|
||||||
|
|
||||||
|
ticker := time.NewTicker(lastInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -137,6 +142,14 @@ func (m *Manager) Run() {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
m.runIteration()
|
m.runIteration()
|
||||||
iterationDuration.Observe(float64(time.Since(start) / time.Millisecond))
|
iterationDuration.Observe(float64(time.Since(start) / time.Millisecond))
|
||||||
|
|
||||||
|
m.Lock()
|
||||||
|
if lastInterval != m.interval {
|
||||||
|
ticker.Stop()
|
||||||
|
ticker = time.NewTicker(m.interval)
|
||||||
|
lastInterval = m.interval
|
||||||
|
}
|
||||||
|
m.Unlock()
|
||||||
case <-m.done:
|
case <-m.done:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -255,11 +268,27 @@ func (m *Manager) runIteration() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadRuleFiles loads alerting and recording rules from the given files.
|
// ApplyConfig updates the rule manager's state as the config requires. If
|
||||||
func (m *Manager) LoadRuleFiles(filenames ...string) error {
|
// loading the new rules failed the old rule set is restored.
|
||||||
|
func (m *Manager) ApplyConfig(conf *config.Config) {
|
||||||
m.Lock()
|
m.Lock()
|
||||||
defer m.Unlock()
|
defer m.Unlock()
|
||||||
|
|
||||||
|
m.interval = time.Duration(conf.GlobalConfig.EvaluationInterval)
|
||||||
|
|
||||||
|
rulesSnapshot := make([]Rule, len(m.rules))
|
||||||
|
copy(rulesSnapshot, m.rules)
|
||||||
|
m.rules = m.rules[:0]
|
||||||
|
|
||||||
|
if err := m.loadRuleFiles(conf.RuleFiles...); err != nil {
|
||||||
|
// If loading the new rules failed, restore the old rule set.
|
||||||
|
m.rules = rulesSnapshot
|
||||||
|
glog.Errorf("Error loading rules, previous rule set restored: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadRuleFiles loads alerting and recording rules from the given files.
|
||||||
|
func (m *Manager) loadRuleFiles(filenames ...string) error {
|
||||||
for _, fn := range filenames {
|
for _, fn := range filenames {
|
||||||
content, err := ioutil.ReadFile(fn)
|
content, err := ioutil.ReadFile(fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/retrieval"
|
"github.com/prometheus/prometheus/retrieval"
|
||||||
"github.com/prometheus/prometheus/rules"
|
"github.com/prometheus/prometheus/rules"
|
||||||
)
|
)
|
||||||
|
@ -47,5 +48,14 @@ func (h *PrometheusStatusHandler) TargetStateToClass() map[retrieval.TargetState
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *PrometheusStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *PrometheusStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
h.mu.RLock()
|
||||||
executeTemplate(w, "status", h, h.PathPrefix)
|
executeTemplate(w, "status", h, h.PathPrefix)
|
||||||
|
h.mu.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ApplyConfig updates the status handler's state as the new config requires.
|
||||||
|
func (h *PrometheusStatusHandler) ApplyConfig(conf *config.Config) {
|
||||||
|
h.mu.Lock()
|
||||||
|
h.Config = conf.String()
|
||||||
|
h.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue