diff --git a/config/config.go b/config/config.go index ae00463299..93d4c67e41 100644 --- a/config/config.go +++ b/config/config.go @@ -204,7 +204,7 @@ type Config struct { RuleFiles []string `yaml:"rule_files,omitempty"` ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"` - RemoteWriteConfig RemoteWriteConfig `yaml:"remote_write,omitempty"` + RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` diff --git a/config/config_test.go b/config/config_test.go index a8c1f32e74..4be6989b7b 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -26,6 +26,14 @@ import ( "gopkg.in/yaml.v2" ) +func mustParseURL(u string) *URL { + parsed, err := url.Parse(u) + if err != nil { + panic(err) + } + return &URL{URL: parsed} +} + var expectedConf = &Config{ GlobalConfig: GlobalConfig{ ScrapeInterval: model.Duration(15 * time.Second), @@ -44,17 +52,24 @@ var expectedConf = &Config{ "testdata/my/*.rules", }, - RemoteWriteConfig: RemoteWriteConfig{ - RemoteTimeout: model.Duration(30 * time.Second), - WriteRelabelConfigs: []*RelabelConfig{ - { - SourceLabels: model.LabelNames{"__name__"}, - Separator: ";", - Regex: MustNewRegexp("expensive.*"), - Replacement: "$1", - Action: RelabelDrop, + RemoteWriteConfigs: []*RemoteWriteConfig{ + { + URL: mustParseURL("http://remote1/push"), + RemoteTimeout: model.Duration(30 * time.Second), + WriteRelabelConfigs: []*RelabelConfig{ + { + SourceLabels: model.LabelNames{"__name__"}, + Separator: ";", + Regex: MustNewRegexp("expensive.*"), + Replacement: "$1", + Action: RelabelDrop, + }, }, }, + { + URL: mustParseURL("http://remote2/push"), + RemoteTimeout: model.Duration(30 * time.Second), + }, }, ScrapeConfigs: []*ScrapeConfig{ diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index a3aca858a4..7fc6161138 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -14,10 +14,12 @@ rule_files: - "my/*.rules" remote_write: - write_relabel_configs: - - source_labels: [__name__] - regex: expensive.* - action: drop + - url: http://remote1/push + write_relabel_configs: + - source_labels: [__name__] + regex: expensive.* + action: drop + - url: http://remote2/push scrape_configs: - job_name: prometheus diff --git a/storage/remote/client.go b/storage/remote/client.go index 771ecb696c..e5d97feda1 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -31,13 +31,14 @@ import ( // Client allows sending batches of Prometheus samples to an HTTP endpoint. type Client struct { + index int // Used to differentiate metrics. url config.URL client *http.Client timeout time.Duration } // NewClient creates a new Client. -func NewClient(conf config.RemoteWriteConfig) (*Client, error) { +func NewClient(index int, conf *config.RemoteWriteConfig) (*Client, error) { tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig) if err != nil { return nil, err @@ -55,6 +56,7 @@ func NewClient(conf config.RemoteWriteConfig) (*Client, error) { } return &Client{ + index: index, url: *conf.URL, client: httputil.NewClient(rt), timeout: time.Duration(conf.RemoteTimeout), @@ -114,7 +116,7 @@ func (c *Client) Store(samples model.Samples) error { return nil } -// Name identifies the client as a generic client. +// Name identifies the client. func (c Client) Name() string { - return "generic" + return fmt.Sprintf("%d:%s", c.index, c.url) } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 87fa5f01cb..b7196f156c 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -20,6 +20,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/relabel" ) // String constants for instrumentation. @@ -27,6 +29,12 @@ const ( namespace = "prometheus" subsystem = "remote_storage" queue = "queue" + + defaultShards = 10 + defaultMaxSamplesPerSend = 100 + // The queue capacity is per shard. + defaultQueueCapacity = 100 * 1024 / defaultShards + defaultBatchSendDeadline = 5 * time.Second ) var ( @@ -105,25 +113,21 @@ type StorageClient interface { Name() string } +// StorageQueueManagerConfig configures a storage queue. type StorageQueueManagerConfig struct { QueueCapacity int // Number of samples to buffer per shard before we start dropping them. Shards int // Number of shards, i.e. amount of concurrency. MaxSamplesPerSend int // Maximum number of samples per send. BatchSendDeadline time.Duration // Maximum time sample will wait in buffer. -} - -var defaultConfig = StorageQueueManagerConfig{ - QueueCapacity: 100 * 1024 / 10, - Shards: 10, - MaxSamplesPerSend: 100, - BatchSendDeadline: 5 * time.Second, + ExternalLabels model.LabelSet + RelabelConfigs []*config.RelabelConfig + Client StorageClient } // StorageQueueManager manages a queue of samples to be sent to the Storage // indicated by the provided StorageClient. type StorageQueueManager struct { cfg StorageQueueManagerConfig - tsdb StorageClient shards []chan *model.Sample wg sync.WaitGroup done chan struct{} @@ -131,9 +135,18 @@ type StorageQueueManager struct { } // NewStorageQueueManager builds a new StorageQueueManager. -func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig) *StorageQueueManager { - if cfg == nil { - cfg = &defaultConfig +func NewStorageQueueManager(cfg StorageQueueManagerConfig) *StorageQueueManager { + if cfg.QueueCapacity == 0 { + cfg.QueueCapacity = defaultQueueCapacity + } + if cfg.Shards == 0 { + cfg.Shards = defaultShards + } + if cfg.MaxSamplesPerSend == 0 { + cfg.MaxSamplesPerSend = defaultMaxSamplesPerSend + } + if cfg.BatchSendDeadline == 0 { + cfg.BatchSendDeadline = defaultBatchSendDeadline } shards := make([]chan *model.Sample, cfg.Shards) @@ -142,11 +155,10 @@ func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig) } t := &StorageQueueManager{ - cfg: *cfg, - tsdb: tsdb, + cfg: cfg, shards: shards, done: make(chan struct{}), - queueName: tsdb.Name(), + queueName: cfg.Client.Name(), } queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) @@ -158,11 +170,28 @@ func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig) // sample on the floor if the queue is full. // Always returns nil. func (t *StorageQueueManager) Append(s *model.Sample) error { - fp := s.Metric.FastFingerprint() + var snew model.Sample + snew = *s + snew.Metric = s.Metric.Clone() + + for ln, lv := range t.cfg.ExternalLabels { + if _, ok := s.Metric[ln]; !ok { + snew.Metric[ln] = lv + } + } + + snew.Metric = model.Metric( + relabel.Process(model.LabelSet(snew.Metric), t.cfg.RelabelConfigs...)) + + if snew.Metric == nil { + return nil + } + + fp := snew.Metric.FastFingerprint() shard := uint64(fp) % uint64(t.cfg.Shards) select { - case t.shards[shard] <- s: + case t.shards[shard] <- &snew: queueLength.WithLabelValues(t.queueName).Inc() default: droppedSamplesTotal.WithLabelValues(t.queueName).Inc() @@ -239,7 +268,7 @@ func (t *StorageQueueManager) sendSamples(s model.Samples) { // sample isn't sent correctly the first time, it's simply dropped on the // floor. begin := time.Now() - err := t.tsdb.Store(s) + err := t.cfg.Client.Store(s) duration := time.Since(begin).Seconds() if err != nil { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index c4d1dbd8b0..3908a5a02f 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -81,9 +81,7 @@ func (c *TestStorageClient) Name() string { func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. - cfg := defaultConfig - n := cfg.QueueCapacity * 2 - cfg.Shards = 1 + n := defaultQueueCapacity * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -98,7 +96,11 @@ func TestSampleDelivery(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples[:len(samples)/2]) - m := NewStorageQueueManager(c, &cfg) + + m := NewStorageQueueManager(StorageQueueManagerConfig{ + Client: c, + Shards: 1, + }) // These should be received by the client. for _, s := range samples[:len(samples)/2] { @@ -115,11 +117,8 @@ func TestSampleDelivery(t *testing.T) { } func TestSampleDeliveryOrder(t *testing.T) { - cfg := defaultConfig ts := 10 - n := cfg.MaxSamplesPerSend * ts - // Ensure we don't drop samples in this test. - cfg.QueueCapacity = n + n := defaultMaxSamplesPerSend * ts samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -135,7 +134,11 @@ func TestSampleDeliveryOrder(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples) - m := NewStorageQueueManager(c, &cfg) + m := NewStorageQueueManager(StorageQueueManagerConfig{ + Client: c, + // Ensure we don't drop samples in this test. + QueueCapacity: n, + }) // These should be received by the client. for _, s := range samples { @@ -194,9 +197,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // `MaxSamplesPerSend*Shards` samples should be consumed by the // per-shard goroutines, and then another `MaxSamplesPerSend` // should be left on the queue. - cfg := defaultConfig - n := cfg.MaxSamplesPerSend*cfg.Shards + cfg.MaxSamplesPerSend - cfg.QueueCapacity = n + n := defaultMaxSamplesPerSend*defaultShards + defaultMaxSamplesPerSend samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -210,7 +211,10 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { } c := NewTestBlockedStorageClient() - m := NewStorageQueueManager(c, &cfg) + m := NewStorageQueueManager(StorageQueueManagerConfig{ + Client: c, + QueueCapacity: n, + }) m.Start() @@ -239,14 +243,14 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { time.Sleep(10 * time.Millisecond) } - if m.queueLen() != cfg.MaxSamplesPerSend { + if m.queueLen() != defaultMaxSamplesPerSend { t.Fatalf("Failed to drain StorageQueueManager queue, %d elements left", m.queueLen(), ) } numCalls := c.NumCalls() - if numCalls != uint64(cfg.Shards) { - t.Errorf("Saw %d concurrent sends, expected %d", numCalls, cfg.Shards) + if numCalls != uint64(defaultShards) { + t.Errorf("Saw %d concurrent sends, expected %d", numCalls, defaultShards) } } diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 1fcb0cded2..2f9f58efaf 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -19,16 +19,12 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/relabel" ) // Storage allows queueing samples for remote writes. type Storage struct { - mtx sync.RWMutex - externalLabels model.LabelSet - conf config.RemoteWriteConfig - - queue *StorageQueueManager + mtx sync.RWMutex + queues []*StorageQueueManager } // ApplyConfig updates the state as the new config requires. @@ -36,34 +32,36 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { s.mtx.Lock() defer s.mtx.Unlock() + newQueues := []*StorageQueueManager{} // TODO: we should only stop & recreate queues which have changes, // as this can be quite disruptive. - var newQueue *StorageQueueManager - - if conf.RemoteWriteConfig.URL != nil { - c, err := NewClient(conf.RemoteWriteConfig) + for i, rwConf := range conf.RemoteWriteConfigs { + c, err := NewClient(i, rwConf) if err != nil { return err } - newQueue = NewStorageQueueManager(c, nil) + newQueues = append(newQueues, NewStorageQueueManager(StorageQueueManagerConfig{ + Client: c, + ExternalLabels: conf.GlobalConfig.ExternalLabels, + RelabelConfigs: rwConf.WriteRelabelConfigs, + })) } - if s.queue != nil { - s.queue.Stop() + for _, q := range s.queues { + q.Stop() } - s.queue = newQueue - s.conf = conf.RemoteWriteConfig - s.externalLabels = conf.GlobalConfig.ExternalLabels - if s.queue != nil { - s.queue.Start() + + s.queues = newQueues + for _, q := range s.queues { + q.Start() } return nil } // Stop the background processing of the storage queues. func (s *Storage) Stop() { - if s.queue != nil { - s.queue.Stop() + for _, q := range s.queues { + q.Stop() } } @@ -72,26 +70,9 @@ func (s *Storage) Append(smpl *model.Sample) error { s.mtx.RLock() defer s.mtx.RUnlock() - if s.queue == nil { - return nil + for _, q := range s.queues { + q.Append(smpl) } - - var snew model.Sample - snew = *smpl - snew.Metric = smpl.Metric.Clone() - - for ln, lv := range s.externalLabels { - if _, ok := smpl.Metric[ln]; !ok { - snew.Metric[ln] = lv - } - } - snew.Metric = model.Metric( - relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...)) - - if snew.Metric == nil { - return nil - } - s.queue.Append(&snew) return nil }