From cc1a2a20619dcfede66117d3993ea1f7cd0e9509 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 1 Sep 2015 17:49:09 +0200 Subject: [PATCH 1/4] Remove attachment of global labels upon ingestion --- retrieval/targetmanager.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index cac55651a..194b1f49c 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -51,7 +51,6 @@ type TargetProvider interface { // target providers. type TargetManager struct { mtx sync.RWMutex - globalLabels model.LabelSet sampleAppender storage.SampleAppender running bool done chan struct{} @@ -356,7 +355,6 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { tm.mtx.Lock() defer tm.mtx.Unlock() - tm.globalLabels = cfg.GlobalConfig.Labels tm.providers = providers return true } @@ -481,7 +479,6 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), model.JobLabel: model.LabelValue(cfg.JobName), }, - tm.globalLabels, } for _, lset := range labelsets { for ln, lv := range lset { From 9bbd9264e2e0eac3894fb7137c3a0bc099cb2a41 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 1 Sep 2015 18:47:48 +0200 Subject: [PATCH 2/4] Add global labels to federation --- cmd/prometheus/main.go | 6 ++++-- web/federate.go | 33 +++++++++++++++++++++++---------- web/web.go | 33 +++++++++++++++++++++------------ 3 files changed, 48 insertions(+), 24 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 93b8e3b13..ab7a482c9 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -93,7 +93,9 @@ func Main() int { webHandler := web.New(memStorage, queryEngine, ruleManager, status, &cfg.web) - if !reloadConfig(cfg.configFile, status, targetManager, ruleManager) { + reloadables := []Reloadable{status, targetManager, ruleManager, webHandler} + + if !reloadConfig(cfg.configFile, reloadables...) { return 1 } @@ -110,7 +112,7 @@ func Main() int { case <-hup: case <-webHandler.Reload(): } - reloadConfig(cfg.configFile, status, targetManager, ruleManager) + reloadConfig(cfg.configFile, reloadables...) } }() diff --git a/web/federate.go b/web/federate.go index 488f1f88e..0a637e7c8 100644 --- a/web/federate.go +++ b/web/federate.go @@ -19,20 +19,18 @@ import ( "github.com/golang/protobuf/proto" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/metric" - dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" + + dto "github.com/prometheus/client_model/go" ) -// Federation implements a web handler to serve scrape federation requests. -type Federation struct { - Storage local.Storage -} +func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { + h.mtx.RLock() + defer h.mtx.RUnlock() -func (fed *Federation) ServeHTTP(w http.ResponseWriter, req *http.Request) { req.ParseForm() metrics := map[model.Fingerprint]metric.Metric{} @@ -43,7 +41,7 @@ func (fed *Federation) ServeHTTP(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - for fp, met := range fed.Storage.MetricsForLabelMatchers(matchers...) { + for fp, met := range h.storage.MetricsForLabelMatchers(matchers...) { metrics[fp] = met } } @@ -63,7 +61,9 @@ func (fed *Federation) ServeHTTP(w http.ResponseWriter, req *http.Request) { } for fp, met := range metrics { - sp := fed.Storage.LastSamplePairForFingerprint(fp) + globalUsed := map[model.LabelName]struct{}{} + + sp := h.storage.LastSamplePairForFingerprint(fp) if sp == nil { continue } @@ -80,14 +80,27 @@ func (fed *Federation) ServeHTTP(w http.ResponseWriter, req *http.Request) { Name: proto.String(string(ln)), Value: proto.String(string(lv)), }) + if _, ok := h.globalLabels[ln]; ok { + globalUsed[ln] = struct{}{} + } } + + // Attach global labels if they do not exist yet. + for ln, lv := range h.globalLabels { + if _, ok := globalUsed[ln]; !ok { + protMetric.Label = append(protMetric.Label, &dto.LabelPair{ + Name: proto.String(string(ln)), + Value: proto.String(string(lv)), + }) + } + } + protMetric.TimestampMs = (*int64)(&sp.Timestamp) protMetric.Untyped.Value = (*float64)(&sp.Value) if err := enc.Encode(protMetricFam); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return - } } } diff --git a/web/web.go b/web/web.go index 9f2884016..9f4badb17 100644 --- a/web/web.go +++ b/web/web.go @@ -54,10 +54,10 @@ var localhostRepresentations = []string{"127.0.0.1", "localhost"} type Handler struct { ruleManager *rules.Manager queryEngine *promql.Engine + storage local.Storage - apiV1 *v1.API - apiLegacy *legacy.API - federation *Federation + apiV1 *v1.API + apiLegacy *legacy.API router *route.Router listenErrCh chan error @@ -66,7 +66,19 @@ type Handler struct { options *Options statusInfo *PrometheusStatus - muAlerts sync.Mutex + globalLabels model.LabelSet + mtx sync.RWMutex +} + +// ApplyConfig updates the status state as the new config requires. +// Returns true on success. +func (h *Handler) ApplyConfig(conf *config.Config) bool { + h.mtx.Lock() + defer h.mtx.Unlock() + + h.globalLabels = conf.GlobalConfig.Labels + + return true } // PrometheusStatus contains various information about the status @@ -89,8 +101,10 @@ type PrometheusStatus struct { // Returns true on success. func (s *PrometheusStatus) ApplyConfig(conf *config.Config) bool { s.mu.Lock() + defer s.mu.Unlock() + s.Config = conf.String() - s.mu.Unlock() + return true } @@ -120,6 +134,7 @@ func New(st local.Storage, qe *promql.Engine, rm *rules.Manager, status *Prometh ruleManager: rm, queryEngine: qe, + storage: st, apiV1: &v1.API{ QueryEngine: qe, @@ -130,9 +145,6 @@ func New(st local.Storage, qe *promql.Engine, rm *rules.Manager, status *Prometh Storage: st, Now: model.Now, }, - federation: &Federation{ - Storage: st, - }, } if o.ExternalURL.Path != "" { @@ -153,7 +165,7 @@ func New(st local.Storage, qe *promql.Engine, rm *rules.Manager, status *Prometh router.Get("/heap", instrf("heap", dumpHeap)) - router.Get("/federate", instrh("federate", h.federation)) + router.Get("/federate", instrf("federate", h.federation)) router.Get(o.MetricsPath, prometheus.Handler().ServeHTTP) h.apiLegacy.Register(router.WithPrefix("/api")) @@ -205,9 +217,6 @@ func (h *Handler) Run() { } func (h *Handler) alerts(w http.ResponseWriter, r *http.Request) { - h.muAlerts.Lock() - defer h.muAlerts.Unlock() - alerts := h.ruleManager.AlertingRules() alertsSorter := byAlertStateSorter{alerts: alerts} sort.Sort(alertsSorter) From 5fed076a76596170cd68694b56099abaf2d7600d Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 1 Sep 2015 22:17:02 +0200 Subject: [PATCH 3/4] Attach global labels to outgoing alerts. --- cmd/prometheus/main.go | 2 +- notification/notification.go | 24 +++++++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ab7a482c9..7acd49cad 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -93,7 +93,7 @@ func Main() int { webHandler := web.New(memStorage, queryEngine, ruleManager, status, &cfg.web) - reloadables := []Reloadable{status, targetManager, ruleManager, webHandler} + reloadables := []Reloadable{status, targetManager, ruleManager, webHandler, notificationHandler} if !reloadConfig(cfg.configFile, reloadables...) { return 1 diff --git a/notification/notification.go b/notification/notification.go index bea0ee570..a43d05060 100644 --- a/notification/notification.go +++ b/notification/notification.go @@ -20,12 +20,14 @@ import ( "io/ioutil" "net/http" "strings" + "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/log" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/httputil" ) @@ -86,7 +88,9 @@ type NotificationHandler struct { notificationsQueueLength prometheus.Gauge notificationsQueueCapacity prometheus.Metric - stopped chan struct{} + globalLabels model.LabelSet + mtx sync.RWMutex + stopped chan struct{} } // NotificationHandlerOptions are the configurable parameters of a NotificationHandler. @@ -141,10 +145,28 @@ func NewNotificationHandler(o *NotificationHandlerOptions) *NotificationHandler } } +// ApplyConfig updates the status state as the new config requires. +// Returns true on success. +func (n *NotificationHandler) ApplyConfig(conf *config.Config) bool { + n.mtx.Lock() + defer n.mtx.Unlock() + + n.globalLabels = conf.GlobalConfig.Labels + return true +} + // Send a list of notifications to the configured alert manager. func (n *NotificationHandler) sendNotifications(reqs NotificationReqs) error { + n.mtx.RLock() + defer n.mtx.RUnlock() + alerts := make([]map[string]interface{}, 0, len(reqs)) for _, req := range reqs { + for ln, lv := range n.globalLabels { + if _, ok := req.Labels[ln]; !ok { + req.Labels[ln] = lv + } + } alerts = append(alerts, map[string]interface{}{ "summary": req.Summary, "description": req.Description, From 8fa719f778a68c2ca2d9be810a9bbcbe85e2a227 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 3 Sep 2015 13:44:03 +0200 Subject: [PATCH 4/4] Attach global labels to remote storage samples --- cmd/prometheus/main.go | 5 ++++- storage/remote/remote.go | 31 +++++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 7acd49cad..5e17405e4 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -57,6 +57,8 @@ func Main() int { return 0 } + var reloadables []Reloadable + var ( memStorage = local.NewMemorySeriesStorage(&cfg.storage) remoteStorage = remote.New(&cfg.remote) @@ -64,6 +66,7 @@ func Main() int { ) if remoteStorage != nil { sampleAppender = append(sampleAppender, remoteStorage) + reloadables = append(reloadables, remoteStorage) } var ( @@ -93,7 +96,7 @@ func Main() int { webHandler := web.New(memStorage, queryEngine, ruleManager, status, &cfg.web) - reloadables := []Reloadable{status, targetManager, ruleManager, webHandler, notificationHandler} + reloadables = append(reloadables, status, targetManager, ruleManager, webHandler, notificationHandler) if !reloadConfig(cfg.configFile, reloadables...) { return 1 diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 3e8e25c56..aab7f34d3 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -14,18 +14,32 @@ package remote import ( + "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage/remote/influxdb" "github.com/prometheus/prometheus/storage/remote/opentsdb" ) // Storage collects multiple remote storage queues. type Storage struct { - queues []*StorageQueueManager + queues []*StorageQueueManager + globalLabels model.LabelSet + mtx sync.RWMutex +} + +// ApplyConfig updates the status state as the new config requires. +// Returns true on success. +func (s *Storage) ApplyConfig(conf *config.Config) bool { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.globalLabels = conf.GlobalConfig.Labels + return true } // New returns a new remote Storage. @@ -70,8 +84,21 @@ func (s *Storage) Stop() { // Append implements storage.SampleAppender. func (s *Storage) Append(smpl *model.Sample) { + s.mtx.RLock() + + var snew model.Sample + snew = *smpl + snew.Metric = smpl.Metric.Clone() + + for ln, lv := range s.globalLabels { + if _, ok := smpl.Metric[ln]; !ok { + snew.Metric[ln] = lv + } + } + s.mtx.RUnlock() + for _, q := range s.queues { - q.Append(smpl) + q.Append(&snew) } }