Merge pull request #2423 from prometheus/multiple-remote-writers

Re-add multiple remote writers
This commit is contained in:
Julius Volz 2017-02-23 09:48:40 +01:00 committed by GitHub
commit 16bd5c8ebe
9 changed files with 139 additions and 109 deletions

View file

@ -204,7 +204,7 @@ type Config struct {
RuleFiles []string `yaml:"rule_files,omitempty"`
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
RemoteWriteConfig RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`

View file

@ -26,6 +26,14 @@ import (
"gopkg.in/yaml.v2"
)
func mustParseURL(u string) *URL {
parsed, err := url.Parse(u)
if err != nil {
panic(err)
}
return &URL{URL: parsed}
}
var expectedConf = &Config{
GlobalConfig: GlobalConfig{
ScrapeInterval: model.Duration(15 * time.Second),
@ -44,17 +52,24 @@ var expectedConf = &Config{
"testdata/my/*.rules",
},
RemoteWriteConfig: RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second),
WriteRelabelConfigs: []*RelabelConfig{
{
SourceLabels: model.LabelNames{"__name__"},
Separator: ";",
Regex: MustNewRegexp("expensive.*"),
Replacement: "$1",
Action: RelabelDrop,
RemoteWriteConfigs: []*RemoteWriteConfig{
{
URL: mustParseURL("http://remote1/push"),
RemoteTimeout: model.Duration(30 * time.Second),
WriteRelabelConfigs: []*RelabelConfig{
{
SourceLabels: model.LabelNames{"__name__"},
Separator: ";",
Regex: MustNewRegexp("expensive.*"),
Replacement: "$1",
Action: RelabelDrop,
},
},
},
{
URL: mustParseURL("http://remote2/push"),
RemoteTimeout: model.Duration(30 * time.Second),
},
},
ScrapeConfigs: []*ScrapeConfig{

View file

@ -14,10 +14,12 @@ rule_files:
- "my/*.rules"
remote_write:
write_relabel_configs:
- source_labels: [__name__]
regex: expensive.*
action: drop
- url: http://remote1/push
write_relabel_configs:
- source_labels: [__name__]
regex: expensive.*
action: drop
- url: http://remote2/push
scrape_configs:
- job_name: prometheus

View file

@ -31,13 +31,14 @@ import (
// Client allows sending batches of Prometheus samples to an HTTP endpoint.
type Client struct {
index int // Used to differentiate metrics.
url config.URL
client *http.Client
timeout time.Duration
}
// NewClient creates a new Client.
func NewClient(conf config.RemoteWriteConfig) (*Client, error) {
func NewClient(index int, conf *config.RemoteWriteConfig) (*Client, error) {
tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil {
return nil, err
@ -55,6 +56,7 @@ func NewClient(conf config.RemoteWriteConfig) (*Client, error) {
}
return &Client{
index: index,
url: *conf.URL,
client: httputil.NewClient(rt),
timeout: time.Duration(conf.RemoteTimeout),
@ -114,7 +116,7 @@ func (c *Client) Store(samples model.Samples) error {
return nil
}
// Name identifies the client as a generic client.
// Name identifies the client.
func (c Client) Name() string {
return "generic"
return fmt.Sprintf("%d:%s", c.index, c.url)
}

View file

@ -20,6 +20,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/relabel"
)
// String constants for instrumentation.
@ -27,6 +29,12 @@ const (
namespace = "prometheus"
subsystem = "remote_storage"
queue = "queue"
defaultShards = 10
defaultMaxSamplesPerSend = 100
// The queue capacity is per shard.
defaultQueueCapacity = 100 * 1024 / defaultShards
defaultBatchSendDeadline = 5 * time.Second
)
var (
@ -105,35 +113,40 @@ type StorageClient interface {
Name() string
}
type StorageQueueManagerConfig struct {
// QueueManagerConfig configures a storage queue.
type QueueManagerConfig struct {
QueueCapacity int // Number of samples to buffer per shard before we start dropping them.
Shards int // Number of shards, i.e. amount of concurrency.
MaxSamplesPerSend int // Maximum number of samples per send.
BatchSendDeadline time.Duration // Maximum time sample will wait in buffer.
ExternalLabels model.LabelSet
RelabelConfigs []*config.RelabelConfig
Client StorageClient
}
var defaultConfig = StorageQueueManagerConfig{
QueueCapacity: 100 * 1024 / 10,
Shards: 10,
MaxSamplesPerSend: 100,
BatchSendDeadline: 5 * time.Second,
}
// StorageQueueManager manages a queue of samples to be sent to the Storage
// QueueManager manages a queue of samples to be sent to the Storage
// indicated by the provided StorageClient.
type StorageQueueManager struct {
cfg StorageQueueManagerConfig
tsdb StorageClient
type QueueManager struct {
cfg QueueManagerConfig
shards []chan *model.Sample
wg sync.WaitGroup
done chan struct{}
queueName string
}
// NewStorageQueueManager builds a new StorageQueueManager.
func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig) *StorageQueueManager {
if cfg == nil {
cfg = &defaultConfig
// NewQueueManager builds a new QueueManager.
func NewQueueManager(cfg QueueManagerConfig) *QueueManager {
if cfg.QueueCapacity == 0 {
cfg.QueueCapacity = defaultQueueCapacity
}
if cfg.Shards == 0 {
cfg.Shards = defaultShards
}
if cfg.MaxSamplesPerSend == 0 {
cfg.MaxSamplesPerSend = defaultMaxSamplesPerSend
}
if cfg.BatchSendDeadline == 0 {
cfg.BatchSendDeadline = defaultBatchSendDeadline
}
shards := make([]chan *model.Sample, cfg.Shards)
@ -141,12 +154,11 @@ func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig)
shards[i] = make(chan *model.Sample, cfg.QueueCapacity)
}
t := &StorageQueueManager{
cfg: *cfg,
tsdb: tsdb,
t := &QueueManager{
cfg: cfg,
shards: shards,
done: make(chan struct{}),
queueName: tsdb.Name(),
queueName: cfg.Client.Name(),
}
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity))
@ -157,12 +169,29 @@ func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig)
// Append queues a sample to be sent to the remote storage. It drops the
// sample on the floor if the queue is full.
// Always returns nil.
func (t *StorageQueueManager) Append(s *model.Sample) error {
fp := s.Metric.FastFingerprint()
func (t *QueueManager) Append(s *model.Sample) error {
var snew model.Sample
snew = *s
snew.Metric = s.Metric.Clone()
for ln, lv := range t.cfg.ExternalLabels {
if _, ok := s.Metric[ln]; !ok {
snew.Metric[ln] = lv
}
}
snew.Metric = model.Metric(
relabel.Process(model.LabelSet(snew.Metric), t.cfg.RelabelConfigs...))
if snew.Metric == nil {
return nil
}
fp := snew.Metric.FastFingerprint()
shard := uint64(fp) % uint64(t.cfg.Shards)
select {
case t.shards[shard] <- s:
case t.shards[shard] <- &snew:
queueLength.WithLabelValues(t.queueName).Inc()
default:
droppedSamplesTotal.WithLabelValues(t.queueName).Inc()
@ -174,13 +203,13 @@ func (t *StorageQueueManager) Append(s *model.Sample) error {
// NeedsThrottling implements storage.SampleAppender. It will always return
// false as a remote storage drops samples on the floor if backlogging instead
// of asking for throttling.
func (*StorageQueueManager) NeedsThrottling() bool {
func (*QueueManager) NeedsThrottling() bool {
return false
}
// Start the queue manager sending samples to the remote storage.
// Does not block.
func (t *StorageQueueManager) Start() {
func (t *QueueManager) Start() {
for i := 0; i < t.cfg.Shards; i++ {
go t.runShard(i)
}
@ -188,7 +217,7 @@ func (t *StorageQueueManager) Start() {
// Stop stops sending samples to the remote storage and waits for pending
// sends to complete.
func (t *StorageQueueManager) Stop() {
func (t *QueueManager) Stop() {
log.Infof("Stopping remote storage...")
for _, shard := range t.shards {
close(shard)
@ -197,7 +226,7 @@ func (t *StorageQueueManager) Stop() {
log.Info("Remote storage stopped.")
}
func (t *StorageQueueManager) runShard(i int) {
func (t *QueueManager) runShard(i int) {
defer t.wg.Done()
shard := t.shards[i]
@ -234,12 +263,12 @@ func (t *StorageQueueManager) runShard(i int) {
}
}
func (t *StorageQueueManager) sendSamples(s model.Samples) {
func (t *QueueManager) sendSamples(s model.Samples) {
// Samples are sent to the remote storage on a best-effort basis. If a
// sample isn't sent correctly the first time, it's simply dropped on the
// floor.
begin := time.Now()
err := t.tsdb.Store(s)
err := t.cfg.Client.Store(s)
duration := time.Since(begin).Seconds()
if err != nil {

View file

@ -81,9 +81,7 @@ func (c *TestStorageClient) Name() string {
func TestSampleDelivery(t *testing.T) {
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
cfg := defaultConfig
n := cfg.QueueCapacity * 2
cfg.Shards = 1
n := defaultQueueCapacity * 2
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
@ -98,7 +96,11 @@ func TestSampleDelivery(t *testing.T) {
c := NewTestStorageClient()
c.expectSamples(samples[:len(samples)/2])
m := NewStorageQueueManager(c, &cfg)
m := NewQueueManager(QueueManagerConfig{
Client: c,
Shards: 1,
})
// These should be received by the client.
for _, s := range samples[:len(samples)/2] {
@ -115,11 +117,8 @@ func TestSampleDelivery(t *testing.T) {
}
func TestSampleDeliveryOrder(t *testing.T) {
cfg := defaultConfig
ts := 10
n := cfg.MaxSamplesPerSend * ts
// Ensure we don't drop samples in this test.
cfg.QueueCapacity = n
n := defaultMaxSamplesPerSend * ts
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
@ -135,7 +134,11 @@ func TestSampleDeliveryOrder(t *testing.T) {
c := NewTestStorageClient()
c.expectSamples(samples)
m := NewStorageQueueManager(c, &cfg)
m := NewQueueManager(QueueManagerConfig{
Client: c,
// Ensure we don't drop samples in this test.
QueueCapacity: n,
})
// These should be received by the client.
for _, s := range samples {
@ -181,7 +184,7 @@ func (c *TestBlockingStorageClient) Name() string {
return "testblockingstorageclient"
}
func (t *StorageQueueManager) queueLen() int {
func (t *QueueManager) queueLen() int {
queueLength := 0
for _, shard := range t.shards {
queueLength += len(shard)
@ -194,9 +197,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
// `MaxSamplesPerSend*Shards` samples should be consumed by the
// per-shard goroutines, and then another `MaxSamplesPerSend`
// should be left on the queue.
cfg := defaultConfig
n := cfg.MaxSamplesPerSend*cfg.Shards + cfg.MaxSamplesPerSend
cfg.QueueCapacity = n
n := defaultMaxSamplesPerSend*defaultShards + defaultMaxSamplesPerSend
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
@ -210,7 +211,10 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
}
c := NewTestBlockedStorageClient()
m := NewStorageQueueManager(c, &cfg)
m := NewQueueManager(QueueManagerConfig{
Client: c,
QueueCapacity: n,
})
m.Start()
@ -239,14 +243,14 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}
if m.queueLen() != cfg.MaxSamplesPerSend {
t.Fatalf("Failed to drain StorageQueueManager queue, %d elements left",
if m.queueLen() != defaultMaxSamplesPerSend {
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
m.queueLen(),
)
}
numCalls := c.NumCalls()
if numCalls != uint64(cfg.Shards) {
t.Errorf("Saw %d concurrent sends, expected %d", numCalls, cfg.Shards)
if numCalls != uint64(defaultShards) {
t.Errorf("Saw %d concurrent sends, expected %d", numCalls, defaultShards)
}
}

View file

@ -19,16 +19,12 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/relabel"
)
// Storage allows queueing samples for remote writes.
type Storage struct {
mtx sync.RWMutex
externalLabels model.LabelSet
conf config.RemoteWriteConfig
queue *StorageQueueManager
mtx sync.RWMutex
queues []*QueueManager
}
// ApplyConfig updates the state as the new config requires.
@ -36,34 +32,36 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
s.mtx.Lock()
defer s.mtx.Unlock()
newQueues := []*QueueManager{}
// TODO: we should only stop & recreate queues which have changes,
// as this can be quite disruptive.
var newQueue *StorageQueueManager
if conf.RemoteWriteConfig.URL != nil {
c, err := NewClient(conf.RemoteWriteConfig)
for i, rwConf := range conf.RemoteWriteConfigs {
c, err := NewClient(i, rwConf)
if err != nil {
return err
}
newQueue = NewStorageQueueManager(c, nil)
newQueues = append(newQueues, NewQueueManager(QueueManagerConfig{
Client: c,
ExternalLabels: conf.GlobalConfig.ExternalLabels,
RelabelConfigs: rwConf.WriteRelabelConfigs,
}))
}
if s.queue != nil {
s.queue.Stop()
for _, q := range s.queues {
q.Stop()
}
s.queue = newQueue
s.conf = conf.RemoteWriteConfig
s.externalLabels = conf.GlobalConfig.ExternalLabels
if s.queue != nil {
s.queue.Start()
s.queues = newQueues
for _, q := range s.queues {
q.Start()
}
return nil
}
// Stop the background processing of the storage queues.
func (s *Storage) Stop() {
if s.queue != nil {
s.queue.Stop()
for _, q := range s.queues {
q.Stop()
}
}
@ -72,26 +70,9 @@ func (s *Storage) Append(smpl *model.Sample) error {
s.mtx.RLock()
defer s.mtx.RUnlock()
if s.queue == nil {
return nil
for _, q := range s.queues {
q.Append(smpl)
}
var snew model.Sample
snew = *smpl
snew.Metric = smpl.Metric.Clone()
for ln, lv := range s.externalLabels {
if _, ok := smpl.Metric[ln]; !ok {
snew.Metric[ln] = lv
}
}
snew.Metric = model.Metric(
relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...))
if snew.Metric == nil {
return nil
}
s.queue.Append(&snew)
return nil
}

View file

@ -129,11 +129,8 @@ func (s *Sample) Equal(o *Sample) bool {
if !s.Timestamp.Equal(o.Timestamp) {
return false
}
if s.Value.Equal(o.Value) {
return false
}
return true
return s.Value.Equal(o.Value)
}
func (s Sample) String() string {

4
vendor/vendor.json vendored
View file

@ -560,8 +560,8 @@
{
"checksumSHA1": "vopCLXHzYm+3l5fPKOf4/fQwrCM=",
"path": "github.com/prometheus/common/model",
"revision": "dd2f054febf4a6c00f2343686efb775948a8bff4",
"revisionTime": "2017-01-08T23:12:12Z"
"revision": "3007b6072c17c8d985734e6e19b1dea9174e13d3",
"revisionTime": "2017-02-19T00:35:58+01:00"
},
{
"checksumSHA1": "ZbbESWBHHcPUJ/A5yrzKhTHuPc8=",