diff --git a/config/config.go b/config/config.go index e162655be..794ccc0ee 100644 --- a/config/config.go +++ b/config/config.go @@ -265,15 +265,27 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { } jobNames[scfg.JobName] = struct{}{} } + rwNames := map[string]struct{}{} for _, rwcfg := range c.RemoteWriteConfigs { if rwcfg == nil { return errors.New("empty or null remote write config section") } + // Skip empty names, we fill their name with their config hash in remote write code. + if _, ok := rwNames[rwcfg.Name]; ok && rwcfg.Name != "" { + return errors.Errorf("found multiple remote write configs with job name %q", rwcfg.Name) + } + rwNames[rwcfg.Name] = struct{}{} } + rrNames := map[string]struct{}{} for _, rrcfg := range c.RemoteReadConfigs { if rrcfg == nil { return errors.New("empty or null remote read config section") } + // Skip empty names, we fill their name with their config hash in remote read code. + if _, ok := rrNames[rrcfg.Name]; ok && rrcfg.Name != "" { + return errors.Errorf("found multiple remote read configs with job name %q", rrcfg.Name) + } + rrNames[rrcfg.Name] = struct{}{} } return nil } @@ -596,6 +608,7 @@ type RemoteWriteConfig struct { URL *config_util.URL `yaml:"url"` RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"` + Name string `yaml:"name,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. @@ -654,6 +667,8 @@ type RemoteReadConfig struct { URL *config_util.URL `yaml:"url"` RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` ReadRecent bool `yaml:"read_recent,omitempty"` + Name string `yaml:"name,omitempty"` + // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. HTTPClientConfig config_util.HTTPClientConfig `yaml:",inline"` diff --git a/config/config_test.go b/config/config_test.go index e18b92012..a15b527ac 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -73,6 +73,7 @@ var expectedConf = &Config{ { URL: mustParseURL("http://remote1/push"), RemoteTimeout: model.Duration(30 * time.Second), + Name: "drop_expensive", WriteRelabelConfigs: []*relabel.Config{ { SourceLabels: model.LabelNames{"__name__"}, @@ -88,6 +89,7 @@ var expectedConf = &Config{ URL: mustParseURL("http://remote2/push"), RemoteTimeout: model.Duration(30 * time.Second), QueueConfig: DefaultQueueConfig, + Name: "rw_tls", HTTPClientConfig: config_util.HTTPClientConfig{ TLSConfig: config_util.TLSConfig{ CertFile: filepath.FromSlash("testdata/valid_cert_file"), @@ -102,11 +104,13 @@ var expectedConf = &Config{ URL: mustParseURL("http://remote1/read"), RemoteTimeout: model.Duration(1 * time.Minute), ReadRecent: true, + Name: "default", }, { URL: mustParseURL("http://remote3/read"), RemoteTimeout: model.Duration(1 * time.Minute), ReadRecent: false, + Name: "read_special", RequiredMatchers: model.LabelSet{"job": "special"}, HTTPClientConfig: config_util.HTTPClientConfig{ TLSConfig: config_util.TLSConfig{ @@ -825,6 +829,12 @@ var expectedErrors = []struct { }, { filename: "remote_write_url_missing.bad.yml", errMsg: `url for remote_write is empty`, + }, { + filename: "remote_write_dup.bad.yml", + errMsg: `found multiple remote write configs with job name "queue1"`, + }, { + filename: "remote_read_dup.bad.yml", + errMsg: `found multiple remote read configs with job name "queue1"`, }, { filename: "ec2_filters_empty_values.bad.yml", diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 699321780..40968fc74 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -14,11 +14,13 @@ rule_files: remote_write: - url: http://remote1/push + name: drop_expensive write_relabel_configs: - source_labels: [__name__] regex: expensive.* action: drop - url: http://remote2/push + name: rw_tls tls_config: cert_file: valid_cert_file key_file: valid_key_file @@ -26,8 +28,10 @@ remote_write: remote_read: - url: http://remote1/read read_recent: true + name: default - url: http://remote3/read read_recent: false + name: read_special required_matchers: job: special tls_config: diff --git a/config/testdata/remote_read_dup.bad.yml b/config/testdata/remote_read_dup.bad.yml new file mode 100644 index 000000000..80c4f0f01 --- /dev/null +++ b/config/testdata/remote_read_dup.bad.yml @@ -0,0 +1,5 @@ +remote_read: + - url: http://localhost:9090 + name: queue1 + - url: localhost:9091 + name: queue1 diff --git a/config/testdata/remote_write_dup.bad.yml b/config/testdata/remote_write_dup.bad.yml new file mode 100644 index 000000000..1fdc093ac --- /dev/null +++ b/config/testdata/remote_write_dup.bad.yml @@ -0,0 +1,6 @@ +remote_write: + - url: localhost:9090 + name: queue1 + - url: localhost:9091 + name: queue1 + diff --git a/storage/remote/client.go b/storage/remote/client.go index 7183c793e..764d4b082 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -40,10 +40,10 @@ var userAgent = fmt.Sprintf("Prometheus/%s", version.Version) // Client allows reading and writing from/to a remote HTTP endpoint. type Client struct { - index int // Used to differentiate clients in metrics. - url *config_util.URL - client *http.Client - timeout time.Duration + remoteName string // Used to differentiate clients in metrics. + url *config_util.URL + client *http.Client + timeout time.Duration } // ClientConfig configures a Client. @@ -54,17 +54,17 @@ type ClientConfig struct { } // NewClient creates a new Client. -func NewClient(index int, conf *ClientConfig) (*Client, error) { +func NewClient(remoteName string, conf *ClientConfig) (*Client, error) { httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage", false) if err != nil { return nil, err } return &Client{ - index: index, - url: conf.URL, - client: httpClient, - timeout: time.Duration(conf.Timeout), + remoteName: remoteName, + url: conf.URL, + client: httpClient, + timeout: time.Duration(conf.Timeout), }, nil } @@ -115,9 +115,14 @@ func (c *Client) Store(ctx context.Context, req []byte) error { return err } -// Name identifies the client. +// Name uniquely identifies the client. func (c Client) Name() string { - return fmt.Sprintf("%d:%s", c.index, c.url) + return c.remoteName +} + +// Endpoint is the remote read or write endpoint. +func (c Client) Endpoint() string { + return c.url.String() } // Read reads from a remote endpoint. diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 2170d9073..f0ebee6b3 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -62,18 +62,18 @@ func TestStoreHTTPErrorHandling(t *testing.T) { ) serverURL, err := url.Parse(server.URL) - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) - c, err := NewClient(0, &ClientConfig{ + conf := &ClientConfig{ URL: &config_util.URL{URL: serverURL}, Timeout: model.Duration(time.Second), - }) - if err != nil { - t.Fatal(err) } + hash, err := toHash(conf) + testutil.Ok(t, err) + c, err := NewClient(hash, conf) + testutil.Ok(t, err) + err = c.Store(context.Background(), []byte{}) if !testutil.ErrorEqual(err, test.err) { t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 923cb14e9..71d34e0d3 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -36,12 +36,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wal" ) -// String constants for instrumentation. const ( - namespace = "prometheus" - subsystem = "remote_storage" - queue = "queue" - // We track samples in/out and how long pushes take using an Exponentially // Weighted Moving Average. ewmaWeight = 0.2 @@ -59,7 +54,7 @@ var ( Name: "succeeded_samples_total", Help: "Total number of samples successfully sent to remote storage.", }, - []string{queue}, + []string{remoteName, endpoint}, ) failedSamplesTotal = promauto.NewCounterVec( prometheus.CounterOpts{ @@ -68,7 +63,7 @@ var ( Name: "failed_samples_total", Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.", }, - []string{queue}, + []string{remoteName, endpoint}, ) retriedSamplesTotal = promauto.NewCounterVec( prometheus.CounterOpts{ @@ -77,7 +72,7 @@ var ( Name: "retried_samples_total", Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.", }, - []string{queue}, + []string{remoteName, endpoint}, ) droppedSamplesTotal = promauto.NewCounterVec( prometheus.CounterOpts{ @@ -86,7 +81,7 @@ var ( Name: "dropped_samples_total", Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.", }, - []string{queue}, + []string{remoteName, endpoint}, ) enqueueRetriesTotal = promauto.NewCounterVec( prometheus.CounterOpts{ @@ -95,7 +90,7 @@ var ( Name: "enqueue_retries_total", Help: "Total number of times enqueue has failed because a shards queue was full.", }, - []string{queue}, + []string{remoteName, endpoint}, ) sentBatchDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ @@ -105,7 +100,7 @@ var ( Help: "Duration of sample batch send calls to the remote storage.", Buckets: prometheus.DefBuckets, }, - []string{queue}, + []string{remoteName, endpoint}, ) queueHighestSentTimestamp = promauto.NewGaugeVec( prometheus.GaugeOpts{ @@ -114,7 +109,7 @@ var ( Name: "queue_highest_sent_timestamp_seconds", Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch.", }, - []string{queue}, + []string{remoteName, endpoint}, ) queuePendingSamples = promauto.NewGaugeVec( prometheus.GaugeOpts{ @@ -123,7 +118,7 @@ var ( Name: "pending_samples", Help: "The number of samples pending in the queues shards to be sent to the remote storage.", }, - []string{queue}, + []string{remoteName, endpoint}, ) shardCapacity = promauto.NewGaugeVec( prometheus.GaugeOpts{ @@ -132,7 +127,7 @@ var ( Name: "shard_capacity", Help: "The capacity of each shard of the queue used for parallel sending to the remote storage.", }, - []string{queue}, + []string{remoteName, endpoint}, ) numShards = promauto.NewGaugeVec( prometheus.GaugeOpts{ @@ -141,7 +136,7 @@ var ( Name: "shards", Help: "The number of shards used for parallel sending to the remote storage.", }, - []string{queue}, + []string{remoteName, endpoint}, ) maxNumShards = promauto.NewGaugeVec( prometheus.GaugeOpts{ @@ -150,7 +145,7 @@ var ( Name: "shards_max", Help: "The maximum number of shards that the queue is allowed to run.", }, - []string{queue}, + []string{remoteName, endpoint}, ) minNumShards = promauto.NewGaugeVec( prometheus.GaugeOpts{ @@ -159,7 +154,7 @@ var ( Name: "shards_min", Help: "The minimum number of shards that the queue is allowed to run.", }, - []string{queue}, + []string{remoteName, endpoint}, ) desiredNumShards = promauto.NewGaugeVec( prometheus.GaugeOpts{ @@ -168,7 +163,7 @@ var ( Name: "shards_desired", Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.", }, - []string{queue}, + []string{remoteName, endpoint}, ) bytesSent = promauto.NewCounterVec( prometheus.CounterOpts{ @@ -177,7 +172,7 @@ var ( Name: "sent_bytes_total", Help: "The total number of bytes sent by the queue.", }, - []string{queue}, + []string{remoteName, endpoint}, ) ) @@ -186,8 +181,10 @@ var ( type StorageClient interface { // Store stores the given samples in the remote storage. Store(context.Context, []byte) error - // Name identifies the remote storage implementation. + // Name uniquely identifies the remote storage. Name() string + // Endpoint is the remote read or write endpoint for the storage client. + Endpoint() string } // QueueManager manages a queue of samples to be sent to the Storage @@ -242,8 +239,7 @@ func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string logger = log.NewNopLogger() } - name := client.Name() - logger = log.With(logger, "queue", name) + logger = log.With(logger, remoteName, client.Name(), endpoint, client.Endpoint()) t := &QueueManager{ logger: logger, flushDeadline: flushDeadline, @@ -266,7 +262,7 @@ func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), } - t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, name, t, walDir) + t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, client.Name(), t, walDir) t.shards = t.newShards() return t @@ -326,22 +322,23 @@ func (t *QueueManager) Start() { // constructor because of the ordering of creating Queue Managers's, stopping them, // and then starting new ones in storage/remote/storage.go ApplyConfig. name := t.client.Name() + ep := t.client.Endpoint() t.highestSentTimestampMetric = &maxGauge{ - Gauge: queueHighestSentTimestamp.WithLabelValues(name), + Gauge: queueHighestSentTimestamp.WithLabelValues(name, ep), } - t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(name) - t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(name) - t.droppedSamplesTotal = droppedSamplesTotal.WithLabelValues(name) - t.numShardsMetric = numShards.WithLabelValues(name) - t.failedSamplesTotal = failedSamplesTotal.WithLabelValues(name) - t.sentBatchDuration = sentBatchDuration.WithLabelValues(name) - t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name) - t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name) - t.shardCapacity = shardCapacity.WithLabelValues(name) - t.maxNumShards = maxNumShards.WithLabelValues(name) - t.minNumShards = minNumShards.WithLabelValues(name) - t.desiredNumShards = desiredNumShards.WithLabelValues(name) - t.bytesSent = bytesSent.WithLabelValues(name) + t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(name, ep) + t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(name, ep) + t.droppedSamplesTotal = droppedSamplesTotal.WithLabelValues(name, ep) + t.numShardsMetric = numShards.WithLabelValues(name, ep) + t.failedSamplesTotal = failedSamplesTotal.WithLabelValues(name, ep) + t.sentBatchDuration = sentBatchDuration.WithLabelValues(name, ep) + t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name, ep) + t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name, ep) + t.shardCapacity = shardCapacity.WithLabelValues(name, ep) + t.maxNumShards = maxNumShards.WithLabelValues(name, ep) + t.minNumShards = minNumShards.WithLabelValues(name, ep) + t.desiredNumShards = desiredNumShards.WithLabelValues(name, ep) + t.bytesSent = bytesSent.WithLabelValues(name, ep) // Initialise some metrics. t.shardCapacity.Set(float64(t.cfg.Capacity)) @@ -380,19 +377,20 @@ func (t *QueueManager) Stop() { t.seriesMtx.Unlock() // Delete metrics so we don't have alerts for queues that are gone. name := t.client.Name() - queueHighestSentTimestamp.DeleteLabelValues(name) - queuePendingSamples.DeleteLabelValues(name) - enqueueRetriesTotal.DeleteLabelValues(name) - droppedSamplesTotal.DeleteLabelValues(name) - numShards.DeleteLabelValues(name) - failedSamplesTotal.DeleteLabelValues(name) - sentBatchDuration.DeleteLabelValues(name) - succeededSamplesTotal.DeleteLabelValues(name) - retriedSamplesTotal.DeleteLabelValues(name) - shardCapacity.DeleteLabelValues(name) - maxNumShards.DeleteLabelValues(name) - minNumShards.DeleteLabelValues(name) - desiredNumShards.DeleteLabelValues(name) + ep := t.client.Endpoint() + queueHighestSentTimestamp.DeleteLabelValues(name, ep) + queuePendingSamples.DeleteLabelValues(name, ep) + enqueueRetriesTotal.DeleteLabelValues(name, ep) + droppedSamplesTotal.DeleteLabelValues(name, ep) + numShards.DeleteLabelValues(name, ep) + failedSamplesTotal.DeleteLabelValues(name, ep) + sentBatchDuration.DeleteLabelValues(name, ep) + succeededSamplesTotal.DeleteLabelValues(name, ep) + retriedSamplesTotal.DeleteLabelValues(name, ep) + shardCapacity.DeleteLabelValues(name, ep) + maxNumShards.DeleteLabelValues(name, ep) + minNumShards.DeleteLabelValues(name, ep) + desiredNumShards.DeleteLabelValues(name, ep) } // StoreSeries keeps track of which series we know about for lookups when sending samples to remote. diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 4e4e18975..e012ee398 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -446,6 +446,10 @@ func (c *TestStorageClient) Name() string { return "teststorageclient" } +func (c *TestStorageClient) Endpoint() string { + return "http://test-remote.com/1234" +} + // TestBlockingStorageClient is a queue_manager StorageClient which will block // on any calls to Store(), until the request's Context is cancelled, at which // point the `numCalls` property will contain a count of how many times Store() @@ -472,6 +476,10 @@ func (c *TestBlockingStorageClient) Name() string { return "testblockingstorageclient" } +func (c *TestBlockingStorageClient) Endpoint() string { + return "http://test-remote-blocking.com/1234" +} + func BenchmarkSampleDelivery(b *testing.B) { // Let's create an even number of send batches so we don't run into the // batch timeout case. diff --git a/storage/remote/read.go b/storage/remote/read.go index 9e6135913..2cffdabfb 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -29,7 +29,7 @@ var remoteReadQueries = prometheus.NewGaugeVec( Name: "remote_read_queries", Help: "The number of in-flight remote read queries.", }, - []string{"client"}, + []string{remoteName, endpoint}, ) func init() { @@ -39,7 +39,7 @@ func init() { // QueryableClient returns a storage.Queryable which queries the given // Client to select series sets. func QueryableClient(c *Client) storage.Queryable { - remoteReadQueries.WithLabelValues(c.Name()) + remoteReadQueries.WithLabelValues(c.remoteName, c.url.String()) return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { return &querier{ ctx: ctx, @@ -65,7 +65,7 @@ func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) ( return nil, nil, err } - remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.Name()) + remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.remoteName, q.client.url.String()) remoteReadGauge.Inc() defer remoteReadGauge.Dec() diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 1571a8562..afcab2ede 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -15,15 +15,98 @@ package remote import ( "context" + "io/ioutil" + "net/url" + "os" "reflect" "sort" "testing" + "github.com/prometheus/client_golang/prometheus" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/testutil" ) +func TestNoDuplicateReadConfigs(t *testing.T) { + dir, err := ioutil.TempDir("", "TestNoDuplicateReadConfigs") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + cfg1 := config.RemoteReadConfig{ + Name: "write-1", + URL: &config_util.URL{ + URL: &url.URL{ + Scheme: "http", + Host: "localhost", + }, + }, + } + cfg2 := config.RemoteReadConfig{ + Name: "write-2", + URL: &config_util.URL{ + URL: &url.URL{ + Scheme: "http", + Host: "localhost", + }, + }, + } + cfg3 := config.RemoteReadConfig{ + URL: &config_util.URL{ + URL: &url.URL{ + Scheme: "http", + Host: "localhost", + }, + }, + } + + type testcase struct { + cfgs []*config.RemoteReadConfig + err bool + } + + cases := []testcase{ + { // Duplicates but with different names, we should not get an error. + cfgs: []*config.RemoteReadConfig{ + &cfg1, + &cfg2, + }, + err: false, + }, + { // Duplicates but one with no name, we should not get an error. + cfgs: []*config.RemoteReadConfig{ + &cfg1, + &cfg3, + }, + err: false, + }, + { // Duplicates both with no name, we should get an error. + cfgs: []*config.RemoteReadConfig{ + &cfg3, + &cfg3, + }, + err: true, + }, + } + + for _, tc := range cases { + s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline) + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteReadConfigs: tc.cfgs, + } + err := s.ApplyConfig(conf) + gotError := err != nil + testutil.Equals(t, tc.err, gotError) + + err = s.Close() + testutil.Ok(t, err) + } +} + func TestExternalLabelsQuerierSelect(t *testing.T) { matchers := []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "job", "api-server"), diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 5463735d5..4c136f0e5 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -15,6 +15,10 @@ package remote import ( "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" "sync" "time" @@ -28,6 +32,14 @@ import ( "github.com/prometheus/prometheus/storage" ) +// String constants for instrumentation. +const ( + namespace = "prometheus" + subsystem = "remote_storage" + remoteName = "remote_name" + endpoint = "url" +) + // startTimeCallback is a callback func that return the oldest timestamp stored in a storage. type startTimeCallback func() (int64, error) @@ -67,9 +79,29 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { } // Update read clients + readHashes := make(map[string]struct{}) queryables := make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) - for i, rrConf := range conf.RemoteReadConfigs { - c, err := NewClient(i, &ClientConfig{ + for _, rrConf := range conf.RemoteReadConfigs { + hash, err := toHash(rrConf) + if err != nil { + return nil + } + + // Don't allow duplicate remote read configs. + if _, ok := readHashes[hash]; ok { + return fmt.Errorf("duplicate remote read configs are not allowed, found duplicate for URL: %s", rrConf.URL) + } + readHashes[hash] = struct{}{} + + // Set the queue name to the config hash if the user has not set + // a name in their remote write config so we can still differentiate + // between queues that have the same remote write endpoint. + name := string(hash[:6]) + if rrConf.Name != "" { + name = rrConf.Name + } + + c, err := NewClient(name, &ClientConfig{ URL: rrConf.URL, Timeout: rrConf.RemoteTimeout, HTTPClientConfig: rrConf.HTTPClientConfig, @@ -139,3 +171,13 @@ func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher { } return ms } + +// Used for hashing configs and diff'ing hashes in ApplyConfig. +func toHash(data interface{}) (string, error) { + bytes, err := json.Marshal(data) + if err != nil { + return "", err + } + hash := md5.Sum(bytes) + return hex.EncodeToString(hash[:]), nil +} diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index b03639253..713960000 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -15,10 +15,12 @@ package remote import ( "io/ioutil" + "net/url" "os" "testing" "github.com/prometheus/client_golang/prometheus" + common_config "github.com/prometheus/common/config" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/testutil" ) @@ -38,6 +40,18 @@ func TestStorageLifecycle(t *testing.T) { &config.DefaultRemoteReadConfig, }, } + // We need to set URL's so that metric creation doesn't panic. + conf.RemoteWriteConfigs[0].URL = &common_config.URL{ + URL: &url.URL{ + Host: "http://test-storage.com", + }, + } + conf.RemoteReadConfigs[0].URL = &common_config.URL{ + URL: &url.URL{ + Host: "http://test-storage.com", + }, + } + s.ApplyConfig(conf) // make sure remote write has a queue. diff --git a/storage/remote/write.go b/storage/remote/write.go index f695b4a38..d58dfe45f 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -14,8 +14,7 @@ package remote import ( - "crypto/md5" - "encoding/json" + "fmt" "sync" "time" @@ -50,11 +49,10 @@ type WriteStorage struct { logger log.Logger mtx sync.Mutex - configHash [16]byte - externalLabelHash [16]byte + configHash string + externalLabelHash string walDir string - queues []*QueueManager - hashes [][16]byte + queues map[string]*QueueManager samplesIn *ewmaRate flushDeadline time.Duration } @@ -65,6 +63,7 @@ func NewWriteStorage(logger log.Logger, walDir string, flushDeadline time.Durati logger = log.NewNopLogger() } rws := &WriteStorage{ + queues: make(map[string]*QueueManager), logger: logger, flushDeadline: flushDeadline, samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), @@ -88,20 +87,17 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.mtx.Lock() defer rws.mtx.Unlock() - // Remote write queues only need to change if the remote write config or - // external labels change. Hash these together and only reload if the hash - // changes. - cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs) + configHash, err := toHash(conf.RemoteWriteConfigs) if err != nil { return err } - externalLabelBytes, err := json.Marshal(conf.GlobalConfig.ExternalLabels) + externalLabelHash, err := toHash(conf.GlobalConfig.ExternalLabels) if err != nil { return err } - configHash := md5.Sum(cfgBytes) - externalLabelHash := md5.Sum(externalLabelBytes) + // Remote write queues only need to change if the remote write config or + // external labels change. externalLabelUnchanged := externalLabelHash == rws.externalLabelHash if configHash == rws.configHash && externalLabelUnchanged { level.Debug(rws.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers") @@ -111,28 +107,39 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.configHash = configHash rws.externalLabelHash = externalLabelHash - // Update write queues - newQueues := []*QueueManager{} - newHashes := [][16]byte{} - newClientIndexes := []int{} - for i, rwConf := range conf.RemoteWriteConfigs { - b, err := json.Marshal(rwConf) + newQueues := make(map[string]*QueueManager) + newHashes := []string{} + for _, rwConf := range conf.RemoteWriteConfigs { + hash, err := toHash(rwConf) if err != nil { return err } - // Use RemoteWriteConfigs and its index to get hash. So if its index changed, - // the corresponding queue should also be restarted. - hash := md5.Sum(b) - if i < len(rws.queues) && rws.hashes[i] == hash && externalLabelUnchanged { - // The RemoteWriteConfig and index both not changed, keep the queue. - newQueues = append(newQueues, rws.queues[i]) - newHashes = append(newHashes, hash) - rws.queues[i] = nil + // Set the queue name to the config hash if the user has not set + // a name in their remote write config so we can still differentiate + // between queues that have the same remote write endpoint. + name := string(hash[:6]) + if rwConf.Name != "" { + name = rwConf.Name + } + + // Don't allow duplicate remote write configs. + if _, ok := newQueues[hash]; ok { + return fmt.Errorf("duplicate remote write configs are not allowed, found duplicate for URL: %s", rwConf.URL) + } + + var nameUnchanged bool + queue, ok := rws.queues[hash] + if ok { + nameUnchanged = queue.client.Name() == name + } + if externalLabelUnchanged && nameUnchanged { + newQueues[hash] = queue + delete(rws.queues, hash) continue } - // Otherwise create a new queue. - c, err := NewClient(i, &ClientConfig{ + + c, err := NewClient(name, &ClientConfig{ URL: rwConf.URL, Timeout: rwConf.RemoteTimeout, HTTPClientConfig: rwConf.HTTPClientConfig, @@ -140,7 +147,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { if err != nil { return err } - newQueues = append(newQueues, NewQueueManager( + newQueues[hash] = NewQueueManager( prometheus.DefaultRegisterer, rws.logger, rws.walDir, @@ -150,24 +157,22 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rwConf.WriteRelabelConfigs, c, rws.flushDeadline, - )) + ) + // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) - newClientIndexes = append(newClientIndexes, i) } + // Anything remaining in rws.queues is a queue who's config has + // changed or was removed from the overall remote write config. for _, q := range rws.queues { - // A nil queue means that queue has been reused. - if q != nil { - q.Stop() - } + q.Stop() } - for _, index := range newClientIndexes { - newQueues[index].Start() + for _, hash := range newHashes { + newQueues[hash].Start() } rws.queues = newQueues - rws.hashes = newHashes return nil } diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index 0d602a70f..28743cca0 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -15,16 +15,145 @@ package remote import ( "io/ioutil" + "net/url" "os" "testing" "time" + common_config "github.com/prometheus/common/config" + config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/util/testutil" ) +var cfg = config.RemoteWriteConfig{ + Name: "dev", + URL: &config_util.URL{ + URL: &url.URL{ + Scheme: "http", + Host: "localhost", + }, + }, + QueueConfig: config.DefaultQueueConfig, +} + +func TestNoDuplicateWriteConfigs(t *testing.T) { + dir, err := ioutil.TempDir("", "TestNoDuplicateWriteConfigs") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + cfg1 := config.RemoteWriteConfig{ + Name: "write-1", + URL: &config_util.URL{ + URL: &url.URL{ + Scheme: "http", + Host: "localhost", + }, + }, + QueueConfig: config.DefaultQueueConfig, + } + cfg2 := config.RemoteWriteConfig{ + Name: "write-2", + URL: &config_util.URL{ + URL: &url.URL{ + Scheme: "http", + Host: "localhost", + }, + }, + QueueConfig: config.DefaultQueueConfig, + } + cfg3 := config.RemoteWriteConfig{ + URL: &config_util.URL{ + URL: &url.URL{ + Scheme: "http", + Host: "localhost", + }, + }, + QueueConfig: config.DefaultQueueConfig, + } + + type testcase struct { + cfgs []*config.RemoteWriteConfig + err bool + } + + cases := []testcase{ + { // Two duplicates, we should get an error. + cfgs: []*config.RemoteWriteConfig{ + &cfg1, + &cfg1, + }, + err: true, + }, + { // Duplicates but with different names, we should not get an error. + cfgs: []*config.RemoteWriteConfig{ + &cfg1, + &cfg2, + }, + err: false, + }, + { // Duplicates but one with no name, we should not get an error. + cfgs: []*config.RemoteWriteConfig{ + &cfg1, + &cfg3, + }, + err: false, + }, + { // Duplicates both with no name, we should get an error. + cfgs: []*config.RemoteWriteConfig{ + &cfg3, + &cfg3, + }, + err: true, + }, + } + + for _, tc := range cases { + s := NewWriteStorage(nil, dir, time.Millisecond) + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: tc.cfgs, + } + err := s.ApplyConfig(conf) + gotError := err != nil + testutil.Equals(t, tc.err, gotError) + + err = s.Close() + testutil.Ok(t, err) + } +} + +func TestRestartOnNameChange(t *testing.T) { + dir, err := ioutil.TempDir("", "TestRestartOnNameChange") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + hash, err := toHash(cfg) + testutil.Ok(t, err) + + s := NewWriteStorage(nil, dir, time.Millisecond) + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &cfg, + }, + } + testutil.Ok(t, s.ApplyConfig(conf)) + testutil.Equals(t, s.queues[hash].client.Name(), cfg.Name) + + // Change the queues name, ensure the queue has been restarted. + conf.RemoteWriteConfigs[0].Name = "dev-2" + testutil.Ok(t, s.ApplyConfig(conf)) + hash, err = toHash(cfg) + testutil.Ok(t, err) + testutil.Equals(t, s.queues[hash].client.Name(), conf.RemoteWriteConfigs[0].Name) + + err = s.Close() + testutil.Ok(t, err) +} + func TestWriteStorageLifecycle(t *testing.T) { dir, err := ioutil.TempDir("", "TestWriteStorageLifecycle") testutil.Ok(t, err) @@ -49,23 +178,27 @@ func TestUpdateExternalLabels(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - s := NewWriteStorage(nil, dir, defaultFlushDeadline) + s := NewWriteStorage(nil, dir, time.Second) externalLabels := labels.FromStrings("external", "true") conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, RemoteWriteConfigs: []*config.RemoteWriteConfig{ - &config.DefaultRemoteWriteConfig, + &cfg, }, } + hash, err := toHash(conf.RemoteWriteConfigs[0]) + testutil.Ok(t, err) s.ApplyConfig(conf) testutil.Equals(t, 1, len(s.queues)) - testutil.Equals(t, labels.Labels(nil), s.queues[0].externalLabels) + testutil.Equals(t, labels.Labels(nil), s.queues[hash].externalLabels) conf.GlobalConfig.ExternalLabels = externalLabels + hash, err = toHash(conf.RemoteWriteConfigs[0]) + testutil.Ok(t, err) s.ApplyConfig(conf) testutil.Equals(t, 1, len(s.queues)) - testutil.Equals(t, externalLabels, s.queues[0].externalLabels) + testutil.Equals(t, externalLabels, s.queues[hash].externalLabels) err = s.Close() testutil.Ok(t, err) @@ -84,13 +217,22 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { &config.DefaultRemoteWriteConfig, }, } - s.ApplyConfig(conf) - testutil.Equals(t, 1, len(s.queues)) - queue := s.queues[0] + // We need to set URL's so that metric creation doesn't panic. + conf.RemoteWriteConfigs[0].URL = &common_config.URL{ + URL: &url.URL{ + Host: "http://test-storage.com", + }, + } + hash, err := toHash(conf.RemoteWriteConfigs[0]) + testutil.Ok(t, err) s.ApplyConfig(conf) testutil.Equals(t, 1, len(s.queues)) - testutil.Assert(t, queue == s.queues[0], "Queue pointer should have remained the same") + + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + _, hashExists := s.queues[hash] + testutil.Assert(t, hashExists, "Queue pointer should have remained the same") err = s.Close() testutil.Ok(t, err) @@ -120,9 +262,30 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { GlobalConfig: config.GlobalConfig{}, RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2}, } + // We need to set URL's so that metric creation doesn't panic. + conf.RemoteWriteConfigs[0].URL = &common_config.URL{ + URL: &url.URL{ + Host: "http://test-storage.com", + }, + } + conf.RemoteWriteConfigs[1].URL = &common_config.URL{ + URL: &url.URL{ + Host: "http://test-storage.com", + }, + } + conf.RemoteWriteConfigs[2].URL = &common_config.URL{ + URL: &url.URL{ + Host: "http://test-storage.com", + }, + } s.ApplyConfig(conf) testutil.Equals(t, 3, len(s.queues)) - q := s.queues[1] + + c0Hash, err := toHash(c0) + testutil.Ok(t, err) + c1Hash, err := toHash(c1) + testutil.Ok(t, err) + // q := s.queues[1] // Update c0 and c2. c0.RemoteTimeout = model.Duration(40 * time.Second) @@ -134,7 +297,8 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { s.ApplyConfig(conf) testutil.Equals(t, 3, len(s.queues)) - testutil.Assert(t, q == s.queues[1], "Pointer of unchanged queue should have remained the same") + _, hashExists := s.queues[c1Hash] + testutil.Assert(t, hashExists, "Pointer of unchanged queue should have remained the same") // Delete c0. conf = &config.Config{ @@ -144,7 +308,8 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { s.ApplyConfig(conf) testutil.Equals(t, 2, len(s.queues)) - testutil.Assert(t, q != s.queues[1], "If the index changed, the queue should be stopped and recreated.") + _, hashExists = s.queues[c0Hash] + testutil.Assert(t, !hashExists, "If the index changed, the queue should be stopped and recreated.") err = s.Close() testutil.Ok(t, err)