mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Attach global labels to remote storage samples
This commit is contained in:
parent
5fed076a76
commit
8fa719f778
|
@ -57,6 +57,8 @@ func Main() int {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var reloadables []Reloadable
|
||||||
|
|
||||||
var (
|
var (
|
||||||
memStorage = local.NewMemorySeriesStorage(&cfg.storage)
|
memStorage = local.NewMemorySeriesStorage(&cfg.storage)
|
||||||
remoteStorage = remote.New(&cfg.remote)
|
remoteStorage = remote.New(&cfg.remote)
|
||||||
|
@ -64,6 +66,7 @@ func Main() int {
|
||||||
)
|
)
|
||||||
if remoteStorage != nil {
|
if remoteStorage != nil {
|
||||||
sampleAppender = append(sampleAppender, remoteStorage)
|
sampleAppender = append(sampleAppender, remoteStorage)
|
||||||
|
reloadables = append(reloadables, remoteStorage)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -93,7 +96,7 @@ func Main() int {
|
||||||
|
|
||||||
webHandler := web.New(memStorage, queryEngine, ruleManager, status, &cfg.web)
|
webHandler := web.New(memStorage, queryEngine, ruleManager, status, &cfg.web)
|
||||||
|
|
||||||
reloadables := []Reloadable{status, targetManager, ruleManager, webHandler, notificationHandler}
|
reloadables = append(reloadables, status, targetManager, ruleManager, webHandler, notificationHandler)
|
||||||
|
|
||||||
if !reloadConfig(cfg.configFile, reloadables...) {
|
if !reloadConfig(cfg.configFile, reloadables...) {
|
||||||
return 1
|
return 1
|
||||||
|
|
|
@ -14,18 +14,32 @@
|
||||||
package remote
|
package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/storage/remote/influxdb"
|
"github.com/prometheus/prometheus/storage/remote/influxdb"
|
||||||
"github.com/prometheus/prometheus/storage/remote/opentsdb"
|
"github.com/prometheus/prometheus/storage/remote/opentsdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Storage collects multiple remote storage queues.
|
// Storage collects multiple remote storage queues.
|
||||||
type Storage struct {
|
type Storage struct {
|
||||||
queues []*StorageQueueManager
|
queues []*StorageQueueManager
|
||||||
|
globalLabels model.LabelSet
|
||||||
|
mtx sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// ApplyConfig updates the status state as the new config requires.
|
||||||
|
// Returns true on success.
|
||||||
|
func (s *Storage) ApplyConfig(conf *config.Config) bool {
|
||||||
|
s.mtx.Lock()
|
||||||
|
defer s.mtx.Unlock()
|
||||||
|
|
||||||
|
s.globalLabels = conf.GlobalConfig.Labels
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new remote Storage.
|
// New returns a new remote Storage.
|
||||||
|
@ -70,8 +84,21 @@ func (s *Storage) Stop() {
|
||||||
|
|
||||||
// Append implements storage.SampleAppender.
|
// Append implements storage.SampleAppender.
|
||||||
func (s *Storage) Append(smpl *model.Sample) {
|
func (s *Storage) Append(smpl *model.Sample) {
|
||||||
|
s.mtx.RLock()
|
||||||
|
|
||||||
|
var snew model.Sample
|
||||||
|
snew = *smpl
|
||||||
|
snew.Metric = smpl.Metric.Clone()
|
||||||
|
|
||||||
|
for ln, lv := range s.globalLabels {
|
||||||
|
if _, ok := smpl.Metric[ln]; !ok {
|
||||||
|
snew.Metric[ln] = lv
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.mtx.RUnlock()
|
||||||
|
|
||||||
for _, q := range s.queues {
|
for _, q := range s.queues {
|
||||||
q.Append(smpl)
|
q.Append(&snew)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue